| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.transforms; |
| |
| import com.google.auto.value.AutoValue; |
| import java.io.Serializable; |
| import java.lang.annotation.Documented; |
| import java.lang.annotation.ElementType; |
| import java.lang.annotation.Retention; |
| import java.lang.annotation.RetentionPolicy; |
| import java.lang.annotation.Target; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.annotations.Experimental.Kind; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.state.State; |
| import org.apache.beam.sdk.state.StateSpec; |
| import org.apache.beam.sdk.state.TimeDomain; |
| import org.apache.beam.sdk.state.Timer; |
| import org.apache.beam.sdk.state.TimerSpec; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.testing.TestStream; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.transforms.display.HasDisplayData; |
| import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; |
| import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.transforms.windowing.Window; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.Row; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| |
| /** |
| * The argument to {@link ParDo} providing the code to use to process elements of the input {@link |
| * org.apache.beam.sdk.values.PCollection}. |
| * |
| * <p>See {@link ParDo} for more explanation, examples of use, and discussion of constraints on |
| * {@code DoFn}s, including their serializability, lack of access to global shared mutable state, |
| * requirements for failure tolerance, and benefits of optimization. |
| * |
| * <p>{@link DoFn DoFns} can be tested by using {@link TestPipeline}. You can verify their |
| * functional correctness in a local test using the {@code DirectRunner} as well as running |
| * integration tests with your production runner of choice. Typically, you can generate the input |
| * data using {@link Create#of} or other transforms. However, if you need to test the behavior of |
| * {@link StartBundle} and {@link FinishBundle} with particular bundle boundaries, you can use |
| * {@link TestStream}. |
| * |
| * <p>Implementations must define a method annotated with {@link ProcessElement} that satisfies the |
| * requirements described there. See the {@link ProcessElement} for details. |
| * |
| * <p>Example usage: |
| * |
| * <pre><code> |
| * {@literal PCollection<String>} lines = ... ; |
| * {@literal PCollection<String>} words = |
| * {@literal lines.apply(ParDo.of(new DoFn<String, String>())} { |
| * {@literal @ProcessElement} |
| * public void processElement({@literal @}Element String element, BoundedWindow window) { |
| * ... |
| * }})); |
| * </code></pre> |
| * |
| * @param <InputT> the type of the (main) input elements |
| * @param <OutputT> the type of the (main) output elements |
| */ |
| public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData { |
| /** Information accessible while within the {@link StartBundle} method. */ |
| @SuppressWarnings("ClassCanBeStatic") // Converting class to static is an API change. |
| public abstract class StartBundleContext { |
| /** |
| * Returns the {@code PipelineOptions} specified with the {@link |
| * org.apache.beam.sdk.PipelineRunner} invoking this {@code DoFn}. |
| */ |
| public abstract PipelineOptions getPipelineOptions(); |
| } |
| |
| /** Information accessible while within the {@link FinishBundle} method. */ |
| public abstract class FinishBundleContext { |
| /** |
| * Returns the {@code PipelineOptions} specified with the {@link |
| * org.apache.beam.sdk.PipelineRunner} invoking this {@code DoFn}. |
| */ |
| public abstract PipelineOptions getPipelineOptions(); |
| |
| /** |
| * Adds the given element to the main output {@code PCollection} at the given timestamp in the |
| * given window. |
| * |
| * <p>Once passed to {@code output} the element should not be modified in any way. |
| * |
| * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from the {@link |
| * FinishBundle} method. |
| */ |
| public abstract void output(@Nullable OutputT output, Instant timestamp, BoundedWindow window); |
| |
| /** |
| * Adds the given element to the output {@code PCollection} with the given tag at the given |
| * timestamp in the given window. |
| * |
| * <p>Once passed to {@code output} the element should not be modified in any way. |
| * |
| * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from the {@link |
| * FinishBundle} method. |
| */ |
| public abstract <T> void output( |
| TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window); |
| } |
| |
| /** |
| * Information accessible to all methods in this {@link DoFn} where the context is in some window. |
| */ |
| public abstract class WindowedContext { |
| /** |
| * Returns the {@code PipelineOptions} specified with the {@link |
| * org.apache.beam.sdk.PipelineRunner} invoking this {@code DoFn}. |
| */ |
| public abstract PipelineOptions getPipelineOptions(); |
| |
| /** |
| * Adds the given element to the main output {@code PCollection}. |
| * |
| * <p>Once passed to {@code output} the element should not be modified in any way. |
| * |
| * <p>If invoked from {@link ProcessElement}, the output element will have the same timestamp |
| * and be in the same windows as the input element passed to the method annotated with |
| * {@code @ProcessElement}. |
| * |
| * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the |
| * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to |
| * determine what windows the element should be in, throwing an exception if the {@code |
| * WindowFn} attempts to access any information about the input element. The output element will |
| * have a timestamp of negative infinity. |
| * |
| * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle} |
| * or {@link FinishBundle} methods. |
| */ |
| public abstract void output(OutputT output); |
| |
| /** |
| * Adds the given element to the main output {@code PCollection}, with the given timestamp. |
| * |
| * <p>Once passed to {@code outputWithTimestamp} the element should not be modified in any way. |
| * |
| * <p>If invoked from {@link ProcessElement}), the timestamp must not be older than the input |
| * element's timestamp minus {@link DoFn#getAllowedTimestampSkew}. The output element will be in |
| * the same windows as the input element. |
| * |
| * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the |
| * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to |
| * determine what windows the element should be in, throwing an exception if the {@code |
| * WindowFn} attempts to access any information about the input element except for the |
| * timestamp. |
| * |
| * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle} |
| * or {@link FinishBundle} methods. |
| */ |
| public abstract void outputWithTimestamp(OutputT output, Instant timestamp); |
| |
| /** |
| * Adds the given element to the output {@code PCollection} with the given tag. |
| * |
| * <p>Once passed to {@code output} the element should not be modified in any way. |
| * |
| * <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags} to specify the |
| * tags of outputs that it consumes. Non-consumed outputs, e.g., outputs for monitoring purposes |
| * only, don't necessarily need to be specified. |
| * |
| * <p>The output element will have the same timestamp and be in the same windows as the input |
| * element passed to {@link ProcessElement}). |
| * |
| * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the |
| * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to |
| * determine what windows the element should be in, throwing an exception if the {@code |
| * WindowFn} attempts to access any information about the input element. The output element will |
| * have a timestamp of negative infinity. |
| * |
| * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle} |
| * or {@link FinishBundle} methods. |
| * |
| * @see ParDo.SingleOutput#withOutputTags |
| */ |
| public abstract <T> void output(TupleTag<T> tag, T output); |
| |
| /** |
| * Adds the given element to the specified output {@code PCollection}, with the given timestamp. |
| * |
| * <p>Once passed to {@code outputWithTimestamp} the element should not be modified in any way. |
| * |
| * <p>If invoked from {@link ProcessElement}), the timestamp must not be older than the input |
| * element's timestamp minus {@link DoFn#getAllowedTimestampSkew}. The output element will be in |
| * the same windows as the input element. |
| * |
| * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the |
| * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to |
| * determine what windows the element should be in, throwing an exception if the {@code |
| * WindowFn} attempts to access any information about the input element except for the |
| * timestamp. |
| * |
| * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle} |
| * or {@link FinishBundle} methods. |
| * |
| * @see ParDo.SingleOutput#withOutputTags |
| */ |
| public abstract <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp); |
| } |
| |
| /** Information accessible when running a {@link DoFn.ProcessElement} method. */ |
| public abstract class ProcessContext extends WindowedContext { |
| |
| /** |
| * Returns the input element to be processed. |
| * |
| * <p>The element will not be changed -- it is safe to cache, etc. without copying. |
| * Implementation of {@link DoFn.ProcessElement} method should not mutate the element. |
| */ |
| public abstract InputT element(); |
| |
| /** |
| * Returns the value of the side input. |
| * |
| * @throws IllegalArgumentException if this is not a side input |
| * @see ParDo.SingleOutput#withSideInputs |
| */ |
| public abstract <T> T sideInput(PCollectionView<T> view); |
| |
| /** |
| * Returns the timestamp of the input element. |
| * |
| * <p>See {@link Window} for more information. |
| */ |
| public abstract Instant timestamp(); |
| |
| /** |
| * Returns information about the pane within this window into which the input element has been |
| * assigned. |
| * |
| * <p>Generally all data is in a single, uninteresting pane unless custom triggering and/or late |
| * data has been explicitly requested. See {@link Window} for more information. |
| */ |
| public abstract PaneInfo pane(); |
| |
| /** |
| * Gives the runner a (best-effort) lower bound about the timestamps of future output associated |
| * with the current element. |
| * |
| * <p>If the {@link DoFn} has multiple outputs, the watermark applies to all of them. |
| * |
| * <p>Only splittable {@link DoFn DoFns} are allowed to call this method. It is safe to call |
| * this method from a different thread than the one running {@link ProcessElement}, but all |
| * calls must finish before {@link ProcessElement} returns. |
| */ |
| public abstract void updateWatermark(Instant watermark); |
| } |
| |
| /** Information accessible when running a {@link DoFn.OnTimer} method. */ |
| public abstract class OnTimerContext extends WindowedContext { |
| |
| /** Returns the timestamp of the current timer. */ |
| public abstract Instant timestamp(); |
| |
| /** Returns the window in which the timer is firing. */ |
| public abstract BoundedWindow window(); |
| |
| /** Returns the time domain of the current timer. */ |
| public abstract TimeDomain timeDomain(); |
| } |
| |
| /** |
| * Returns the allowed timestamp skew duration, which is the maximum duration that timestamps can |
| * be shifted backward in {@link WindowedContext#outputWithTimestamp}. |
| * |
| * <p>The default value is {@code Duration.ZERO}, in which case timestamps can only be shifted |
| * forward to future. For infinite skew, return {@code Duration.millis(Long.MAX_VALUE)}. |
| * |
| * @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These |
| * elements are considered late, and if behind the {@link Window#withAllowedLateness(Duration) |
| * allowed lateness} of a downstream {@link PCollection} may be silently dropped. See |
| * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. |
| */ |
| @Deprecated |
| public Duration getAllowedTimestampSkew() { |
| return Duration.ZERO; |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Returns a {@link TypeDescriptor} capturing what is known statically about the input type of |
| * this {@code DoFn} instance's most-derived class. |
| * |
| * <p>See {@link #getOutputTypeDescriptor} for more discussion. |
| */ |
| public TypeDescriptor<InputT> getInputTypeDescriptor() { |
| return new TypeDescriptor<InputT>(getClass()) {}; |
| } |
| |
| /** |
| * Returns a {@link TypeDescriptor} capturing what is known statically about the output type of |
| * this {@code DoFn} instance's most-derived class. |
| * |
| * <p>In the normal case of a concrete {@code DoFn} subclass with no generic type parameters of |
| * its own (including anonymous inner classes), this will be a complete non-generic type, which is |
| * good for choosing a default output {@code Coder<O>} for the output {@code PCollection<O>}. |
| */ |
| public TypeDescriptor<OutputT> getOutputTypeDescriptor() { |
| return new TypeDescriptor<OutputT>(getClass()) {}; |
| } |
| |
| /** Receives values of the given type. */ |
| public interface OutputReceiver<T> { |
| void output(T output); |
| |
| void outputWithTimestamp(T output, Instant timestamp); |
| } |
| |
| /** Receives tagged output for a multi-output function. */ |
| public interface MultiOutputReceiver { |
| /** Returns an {@link OutputReceiver} for the given tag. * */ |
| <T> OutputReceiver<T> get(TupleTag<T> tag); |
| |
| /** |
| * Returns a {@link OutputReceiver} for publishing {@link Row} objects to the given tag. |
| * |
| * <p>The {@link PCollection} representing this tag must have a schema registered in order to |
| * call this function. |
| */ |
| @Experimental(Kind.SCHEMAS) |
| <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag); |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Annotation for declaring and dereferencing state cells. |
| * |
| * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a {@link |
| * StateId}. To use the cell during processing, add a parameter of the appropriate {@link State} |
| * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and |
| * annotate it with {@link StateId}. See the following code for an example: |
| * |
| * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} { |
| * |
| * {@literal @StateId("my-state-id")} |
| * {@literal private final StateSpec<ValueState<MyState>>} myStateSpec = |
| * StateSpecs.value(new MyStateCoder()); |
| * |
| * {@literal @ProcessElement} |
| * public void processElement( |
| * {@literal @Element InputT element}, |
| * {@literal @StateId("my-state-id") ValueState<MyState> myState}) { |
| * myState.read(); |
| * myState.write(...); |
| * } |
| * } |
| * </code></pre> |
| * |
| * <p>State is subject to the following validity conditions: |
| * |
| * <ul> |
| * <li>Each state ID must be declared at most once. |
| * <li>Any state referenced in a parameter must be declared with the same state type. |
| * <li>State declarations must be final. |
| * </ul> |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target({ElementType.FIELD, ElementType.PARAMETER}) |
| @Experimental(Kind.STATE) |
| public @interface StateId { |
| /** The state ID. */ |
| String value(); |
| } |
| |
| /** |
| * Annotation for declaring and dereferencing timers. |
| * |
| * <p>To declare a timer, create a field of type {@link TimerSpec} annotated with a {@link |
| * TimerId}. To use the cell during processing, add a parameter of the type {@link Timer} to your |
| * {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and annotate it with |
| * {@link TimerId}. See the following code for an example: |
| * |
| * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} { |
| * {@literal @TimerId("my-timer-id")} |
| * private final TimerSpec myTimer = TimerSpecs.timerForDomain(TimeDomain.EVENT_TIME); |
| * |
| * {@literal @ProcessElement} |
| * public void processElement( |
| * {@literal @Element InputT element}, |
| * {@literal @TimerId("my-timer-id") Timer myTimer}) { |
| * myTimer.offset(Duration.standardSeconds(...)).setRelative(); |
| * } |
| * |
| * {@literal @OnTimer("my-timer-id")} |
| * public void onMyTimer() { |
| * ... |
| * } |
| * }</code></pre> |
| * |
| * <p>Timers are subject to the following validity conditions: |
| * |
| * <ul> |
| * <li>Each timer must have a distinct id. |
| * <li>Any timer referenced in a parameter must be declared. |
| * <li>Timer declarations must be final. |
| * <li>All declared timers must have a corresponding callback annotated with {@link |
| * OnTimer @OnTimer}. |
| * </ul> |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target({ElementType.FIELD, ElementType.PARAMETER}) |
| @Experimental(Kind.TIMERS) |
| public @interface TimerId { |
| /** The timer ID. */ |
| String value(); |
| } |
| |
| /** Annotation for specifying specific fields that are accessed in a Schema PCollection. */ |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target({ElementType.FIELD, ElementType.PARAMETER}) |
| @Experimental(Kind.SCHEMAS) |
| public @interface FieldAccess { |
| String value(); |
| } |
| |
| /** |
| * Annotation for registering a callback for a timer. |
| * |
| * <p>See the javadoc for {@link TimerId} for use in a full example. |
| * |
| * <p>The method annotated with {@code @OnTimer} may have parameters according to the same logic |
| * as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State} subclasses, |
| * and {@link Timer}. State and timer parameters must be annotated with their {@link StateId} and |
| * {@link TimerId} respectively. |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| @Experimental(Kind.TIMERS) |
| public @interface OnTimer { |
| /** The timer ID. */ |
| String value(); |
| } |
| |
| /** |
| * Annotation for the method to use for performing actions on window expiration. For example, |
| * users can use this annotation to write a method that extracts a value saved in a state before |
| * it gets garbage collected on window expiration. |
| * |
| * <p>The method annotated with {@code @OnWindowExpiration} may have parameters according to the |
| * same logic as {@link OnTimer}. See the following code for an example: |
| * |
| * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} { |
| * |
| * {@literal @ProcessElement} |
| * public void processElement(ProcessContext c) { |
| * } |
| * |
| * {@literal @OnWindowExpiration} |
| * public void onWindowExpiration() { |
| * ... |
| * } |
| * }</code></pre> |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| @Experimental(Kind.STATE) |
| public @interface OnWindowExpiration {} |
| |
| /** |
| * Annotation for the method to use to prepare an instance for processing bundles of elements. |
| * |
| * <p>This is a good place to initialize transient in-memory resources, such as network |
| * connections. The resources can then be disposed in {@link Teardown}. |
| * |
| * <p>This is <b>not</b> a good place to perform external side-effects that later need cleanup, |
| * e.g. creating temporary files on distributed filesystems, starting VMs, or initiating data |
| * export jobs. Such logic must be instead implemented purely via {@link StartBundle}, {@link |
| * ProcessElement} and {@link FinishBundle} methods, references to the objects requiring cleanup |
| * must be passed as {@link PCollection} elements, and they must be cleaned up via regular Beam |
| * transforms, e.g. see the {@link Wait} transform. |
| * |
| * <p>The method annotated with this must satisfy the following constraints: |
| * |
| * <ul> |
| * <li>It must have zero arguments. |
| * </ul> |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| public @interface Setup {} |
| |
| /** |
| * Annotation for the method to use to prepare an instance for processing a batch of elements. The |
| * method annotated with this must satisfy the following constraints: |
| * |
| * <ul> |
| * <li>If one of the parameters is of type {@link DoFn.StartBundleContext}, then it will be |
| * passed a context object for the current execution. |
| * <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a |
| * mechanism to register a callback that will be invoked after the runner successfully |
| * commits the output of this bundle. See <a |
| * href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: How to |
| * Finalize Bundles</a> for further details. |
| * </ul> |
| */ |
| // TODO: Add support for bundle finalization parameter. |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| public @interface StartBundle {} |
| |
| /** |
| * Annotation for the method to use for processing elements. A subclass of {@link DoFn} must have |
| * a method with this annotation. |
| * |
| * <p>The signature of this method must satisfy the following constraints: |
| * |
| * <ul> |
| * <li>If one of its arguments is a {@link RestrictionTracker}, then it is a <a |
| * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} subject to the |
| * separate requirements described below. Items below are assuming this is not a splittable |
| * {@link DoFn}. |
| * <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be |
| * passed the current element being processed; the argument type must match the input type |
| * of this DoFn. |
| * <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be |
| * passed the timestamp of the current element being processed; the argument must be of type |
| * {@link Instant}. |
| * <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the |
| * window of the current element. When applied by {@link ParDo} the subtype of {@link |
| * BoundedWindow} must match the type of windows on the input {@link PCollection}. If the |
| * window is not accessed a runner may perform additional optimizations. |
| * <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information |
| * about the current triggering pane. |
| * <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the |
| * options for the current pipeline. |
| * <li>If one of the parameters is of type {@link OutputReceiver}, then it will be passed an |
| * output receiver for outputting elements to the default output. |
| * <li>If one of the parameters is of type {@link MultiOutputReceiver}, then it will be passed |
| * an output receiver for outputting to multiple tagged outputs. |
| * <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a |
| * mechanism to register a callback that will be invoked after the runner successfully |
| * commits the output of this bundle. See <a |
| * href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: How to |
| * Finalize Bundles</a> for further details. |
| * <li>It must return {@code void}. |
| * </ul> |
| * |
| * <h2>Splittable DoFn's</h2> |
| * |
| * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} method has a parameter |
| * whose type is of {@link RestrictionTracker}. This is an advanced feature and an overwhelming |
| * majority of users will never need to write a splittable {@link DoFn}. |
| * |
| * <p>Not all runners support Splittable DoFn. See the <a |
| * href="https://beam.apache.org/documentation/runners/capability-matrix/">capability matrix</a>. |
| * |
| * <p>See <a href="https://s.apache.org/splittable-do-fn">the proposal</a> for an overview of the |
| * involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, <i>restriction tracker</i>). |
| * |
| * <p>If a {@link DoFn} is splittable, the following constraints must be respected: |
| * |
| * <ul> |
| * <li>It <i>must</i> define a {@link GetInitialRestriction} method. |
| * <li>It <i>may</i> define a {@link GetSize} method. |
| * <li>It <i>may</i> define a {@link GetPartitition} method. |
| * <li>It <i>may</i> define a {@link SplitRestriction} method. |
| * <li>It <i>may</i> define a {@link NewTracker} method returning a subtype of {@code |
| * RestrictionTracker<R>} where {@code R} is the restriction type returned by {@link |
| * GetInitialRestriction}. This method is optional in case the restriction type returned by |
| * {@link GetInitialRestriction} implements {@link HasDefaultTracker}. |
| * <li>It <i>may</i> define a {@link GetRestrictionCoder} method. |
| * <li>The type of restrictions used by all of these methods must be the same. |
| * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to |
| * indicate whether there is more work to be done for the current element. |
| * <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such |
| * as {@link BoundedWindow}. |
| * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or {@link |
| * UnboundedPerElement}, but not both at the same time. If it's not annotated with either of |
| * these, it's assumed to be {@link BoundedPerElement} if its {@link ProcessElement} method |
| * returns {@code void} and {@link UnboundedPerElement} if it returns a {@link |
| * ProcessContinuation}. |
| * </ul> |
| * |
| * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods. |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| public @interface ProcessElement {} |
| |
| /** Parameter annotation for the input element for a {@link ProcessElement} method. */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.PARAMETER) |
| public @interface Element {} |
| |
| /** Parameter annotation for the input element timestamp for a {@link ProcessElement} method. */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.PARAMETER) |
| public @interface Timestamp {} |
| |
| /** Parameter annotation for the SideInput for a {@link ProcessElement} method. */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.PARAMETER) |
| public @interface SideInput { |
| /** The SideInput tag ID. */ |
| String value(); |
| } |
| /** |
| * <b><i>Experimental - no backwards compatibility guarantees. The exact name or usage of this |
| * feature may change.</i></b> |
| * |
| * <p>Annotation that may be added to a {@link ProcessElement}, {@link OnTimer}, or {@link |
| * OnWindowExpiration} method to indicate that the runner must ensure that the observable contents |
| * of the input {@link PCollection} or mutable state must be stable upon retries. |
| * |
| * <p>This is important for sinks, which must ensure exactly-once semantics when writing to a |
| * storage medium outside of your pipeline. A general pattern for a basic sink is to write a |
| * {@link DoFn} that can perform an idempotent write, and annotate that it requires stable input. |
| * Combined, these allow the write to be freely retried until success. |
| * |
| * <p>An example of an unstable input would be anything computed using nondeterministic logic. In |
| * Beam, any user-defined function is permitted to be nondeterministic, and any {@link |
| * PCollection} is permitted to be recomputed in any manner. |
| */ |
| @Documented |
| @Experimental |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| public @interface RequiresStableInput {} |
| |
| /** |
| * Annotation for the method to use to finish processing a batch of elements. The method annotated |
| * with this must satisfy the following constraints: |
| * |
| * <ul> |
| * <li>If one of the parameters is of type {@link DoFn.FinishBundleContext}, then it will be |
| * passed a context object for the current execution. |
| * <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a |
| * mechanism to register a callback that will be invoked after the runner successfully |
| * commits the output of this bundle. See <a |
| * href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: How to |
| * Finalize Bundles</a> for further details. |
| * </ul> |
| * |
| * <p>Note that {@link FinishBundle @FinishBundle} is invoked before the runner commits the output |
| * while {@link BundleFinalizer.Callback bundle finalizer callbacks} are invoked after the runner |
| * has committed the output of a successful bundle. |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| public @interface FinishBundle {} |
| |
| /** |
| * Annotation for the method to use to clean up this instance before it is discarded. No other |
| * method will be called after a call to the annotated method is made. |
| * |
| * <p>A runner will do its best to call this method on any given instance to prevent leaks of |
| * transient resources, however, there may be situations where this is impossible (e.g. process |
| * crash, hardware failure, etc.) or unnecessary (e.g. the pipeline is shutting down and the |
| * process is about to be killed anyway, so all transient resources will be released automatically |
| * by the OS). In these cases, the call may not happen. It will also not be retried, because in |
| * such situations the DoFn instance no longer exists, so there's no instance to retry it on. |
| * |
| * <p>Thus, all work that depends on input elements, and all externally important side effects, |
| * must be performed in the {@link ProcessElement} or {@link FinishBundle} methods. |
| * |
| * <p>Example things that are a good idea to do in this method: |
| * |
| * <ul> |
| * <li>Close a network connection that was opened in {@link Setup} |
| * <li>Shut down a helper process that was started in {@link Setup} |
| * </ul> |
| * |
| * <p>Example things that MUST NOT be done in this method: |
| * |
| * <ul> |
| * <li>Flushing a batch of buffered records to a database: this must be done in {@link |
| * FinishBundle}. |
| * <li>Deleting temporary files on a distributed filesystem: this must be done using the |
| * pipeline structure, e.g. using the {@link Wait} transform. |
| * </ul> |
| * |
| * <p>The method annotated with this must satisfy the following constraint: |
| * |
| * <ul> |
| * <li>It must have zero arguments. |
| * </ul> |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| public @interface Teardown {} |
| |
| /** |
| * Annotation for the method that maps an element to an initial restriction for a <a |
| * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. |
| * |
| * <p>Signature: {@code RestrictionT getInitialRestriction(InputT element);} |
| */ |
| // TODO: Make the InputT parameter optional. |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public @interface GetInitialRestriction {} |
| |
| /** |
| * Annotation for the method that returns the corresponding size for an element and restriction |
| * pair. |
| * |
| * <p>Signature: {@code double getSize(InputT element, RestrictionT restriction);} |
| * |
| * <p>Returns a double representing the size of the element and restriction. |
| * |
| * <p>A representation for the amount of known work represented as a size. Size representations |
| * should preferably represent a linear space and be comparable within the same partition (see |
| * {@link GetPartition} for details on partition identifiers}). |
| * |
| * <p>Splittable {@link DoFn}s should only provide this method if the default implementation |
| * within the {@link RestrictionTracker} is an inaccurate representation of known work. |
| * |
| * <p>It is up to each splittable {@DoFn} to convert between their natural representation of |
| * outstanding work and this representation. For example: |
| * |
| * <ul> |
| * <li>Block based file source (e.g. Avro): From the end of the current block, the remaining |
| * number of bytes to the end of the restriction. |
| * <li>Pull based queue based source (e.g. Pubsub): The local/global size available in number of |
| * messages or number of {@code message bytes} that have not been processed. |
| * <li>Key range based source (e.g. Shuffle, Bigtable, ...): Scale the start key to be one and |
| * end key to be zero and interpolate the position of the next splittable key as the size. |
| * If information about the probability density function or cumulative distribution function |
| * is available, size interpolation can be improved. Alternatively, if the number of encoded |
| * bytes for the keys and values is known for the key range, the number of remaining bytes |
| * can be used. |
| * </ul> |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public @interface GetSize {} |
| |
| /** |
| * Annotation for the method that returns the corresponding partition identifier for an element |
| * and restriction pair. |
| * |
| * <p>Signature: {@code byte[] getPartitition(InputT element, RestrictionT restriction);} |
| * |
| * <p>Returns an immutable representation of the partition identifier as a byte[]. |
| * |
| * <p>By default, the partition identifier is represented as the encoded element and restriction |
| * pair and should only be provided if the splittable {@link DoFn} can only provide a size over a |
| * shared resource such as a message queue that potentially multiple element and restriction pairs |
| * are doing work on. The partition identifier is used by runners for various size calculations. |
| * Sizes reported with the same partition identifier represent a point in time reporting of the |
| * size for that partition. For example, a runner can compute a global size by summing all |
| * reported sizes over all unique partition identifiers while it can compute the size of a |
| * specific partition based upon the last reported value. |
| * |
| * <p>For example splittable {@link DoFn}s which consume elements from: |
| * |
| * <ul> |
| * <li>a globally shared resource such as a Pubsub queue should set this to "". |
| * <li>a shared partitioned resource should use the partition identifier. |
| * <li>a uniquely partitioned resource such as a file and offset range should not override this |
| * since the default element and restriction pair should suffice. |
| * </ul> |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public @interface GetPartition {} |
| |
| /** |
| * Annotation for the method that returns the coder to use for the restriction of a <a |
| * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. |
| * |
| * <p>If not defined, a coder will be inferred using standard coder inference rules and the |
| * pipeline's {@link Pipeline#getCoderRegistry coder registry}. |
| * |
| * <p>This method will be called only at pipeline construction time. |
| * |
| * <p>Signature: {@code Coder<RestrictionT> getRestrictionCoder();} |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public @interface GetRestrictionCoder {} |
| |
| /** |
| * Annotation for the method that splits restriction of a <a |
| * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into multiple parts to |
| * be processed in parallel. |
| * |
| * <p>Signature: {@code void splitRestriction(InputT element, RestrictionT restriction, |
| * OutputReceiver<RestrictionT> receiver);} |
| * |
| * <p>Optional: if this method is omitted, the restriction will not be split (equivalent to |
| * defining the method and outputting the {@code restriction} unchanged). |
| */ |
| // TODO: Make the InputT parameter optional. |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public @interface SplitRestriction {} |
| |
| /** |
| * Annotation for the method that creates a new {@link RestrictionTracker} for the restriction of |
| * a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. |
| * |
| * <p>Signature: {@code MyRestrictionTracker newTracker(RestrictionT restriction);} where {@code |
| * MyRestrictionTracker} must be a subtype of {@code RestrictionTracker<RestrictionT>}. |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.METHOD) |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public @interface NewTracker {} |
| |
| /** |
| * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} |
| * specifying that the {@link DoFn} performs a bounded amount of work per input element, so |
| * applying it to a bounded {@link PCollection} will produce also a bounded {@link PCollection}. |
| * It is an error to specify this on a non-splittable {@link DoFn}. |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.TYPE) |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public @interface BoundedPerElement {} |
| |
| /** |
| * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} |
| * specifying that the {@link DoFn} performs an unbounded amount of work per input element, so |
| * applying it to a bounded {@link PCollection} will produce an unbounded {@link PCollection}. It |
| * is an error to specify this on a non-splittable {@link DoFn}. |
| */ |
| @Documented |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.TYPE) |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public @interface UnboundedPerElement {} |
| |
| // This can't be put into ProcessContinuation itself due to the following problem: |
| // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html |
| private static final ProcessContinuation PROCESS_CONTINUATION_STOP = |
| new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO); |
| |
| /** |
| * When used as a return value of {@link ProcessElement}, indicates whether there is more work to |
| * be done for the current element. |
| * |
| * <p>If the {@link ProcessElement} call completes because of a failed {@code tryClaim()} call on |
| * the {@link RestrictionTracker}, then the call MUST return {@link #stop()}. |
| */ |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| @AutoValue |
| public abstract static class ProcessContinuation { |
| /** Indicates that there is no more work to be done for the current element. */ |
| public static ProcessContinuation stop() { |
| return PROCESS_CONTINUATION_STOP; |
| } |
| |
| /** Indicates that there is more work to be done for the current element. */ |
| public static ProcessContinuation resume() { |
| return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO); |
| } |
| |
| /** |
| * If false, the {@link DoFn} promises that there is no more work remaining for the current |
| * element, so the runner should not resume the {@link ProcessElement} call. |
| */ |
| public abstract boolean shouldResume(); |
| |
| /** |
| * A minimum duration that should elapse between the end of this {@link ProcessElement} call and |
| * the {@link ProcessElement} call continuing processing of the same element. By default, zero. |
| */ |
| public abstract Duration resumeDelay(); |
| |
| /** Builder method to set the value of {@link #resumeDelay()}. */ |
| public ProcessContinuation withResumeDelay(Duration resumeDelay) { |
| return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay); |
| } |
| } |
| |
| /** |
| * Finalize the {@link DoFn} construction to prepare for processing. This method should be called |
| * by runners before any processing methods. |
| * |
| * @deprecated use {@link Setup} or {@link StartBundle} instead. This method will be removed in a |
| * future release. |
| */ |
| @Deprecated |
| public final void prepareForProcessing() {} |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>By default, does not register any display data. Implementors may override this method to |
| * provide their own display data. |
| */ |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) {} |
| |
| /** |
| * A parameter that is accessible during {@link StartBundle @StartBundle}, {@link |
| * ProcessElement @ProcessElement} and {@link FinishBundle @FinishBundle} that allows the caller |
| * to register a callback that will be invoked after the bundle has been successfully completed |
| * and the runner has commit the output. |
| * |
| * <p>A common usage would be to perform any acknowledgements required by an external system such |
| * as acking messages from a message queue since this callback is only invoked after the output of |
| * the bundle has been durably persisted by the runner. |
| * |
| * <p>Note that a runner may make the output of the bundle available immediately to downstream |
| * consumers without waiting for finalization to succeed. For pipelines that are sensitive to |
| * duplicate messages, they must perform output deduplication in the pipeline. |
| */ |
| // TODO: Add support for a deduplication PTransform. |
| @Experimental(Kind.SPLITTABLE_DO_FN) |
| public interface BundleFinalizer { |
| /** |
| * The provided function will be called after the runner successfully commits the output of a |
| * successful bundle. Throwing during finalization represents that bundle finalization may have |
| * failed and the runner may choose to attempt finalization again. The provided {@code |
| * callbackExpiry} controls how long the finalization is valid for before it is garbage |
| * collected and no longer able to be invoked. |
| * |
| * <p>Note that finalization is best effort and it is expected that the external system will |
| * self recover state if finalization never happens or consistently fails. For example, a queue |
| * based system that requires message acknowledgement would replay messages if that |
| * acknowledgement was never received within the provided time bound. |
| * |
| * <p>See <a href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: |
| * How to Finalize Bundles</a> for further details. |
| * |
| * @param callbackExpiry When the finalization callback expires. If the runner cannot commit |
| * results and execute the callback within this duration, the callback will not be invoked. |
| * @param callback The finalization callback method for the runner to invoke after processing |
| * results have been successfully committed. |
| */ |
| void afterBundleCommit(Instant callbackExpiry, Callback callback); |
| |
| /** |
| * An instance of a function that will be invoked after bundle finalization. |
| * |
| * <p>Note that this function should maintain all state necessary outside of a DoFn's context to |
| * be able to perform bundle finalization and should not rely on mutable state stored within a |
| * DoFn instance. |
| */ |
| @FunctionalInterface |
| interface Callback { |
| void onBundleSuccess() throws Exception; |
| } |
| } |
| } |