/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.transforms;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
 * The argument to {@link ParDo} providing the code to use to process elements of the input {@link
 * org.apache.beam.sdk.values.PCollection}.
 *
 * <p>See {@link ParDo} for more explanation, examples of use, and discussion of constraints on
 * {@code DoFn}s, including their serializability, lack of access to global shared mutable state,
 * requirements for failure tolerance, and benefits of optimization.
 *
 * <p>{@link DoFn DoFns} can be tested by using {@link TestPipeline}. You can verify their
 * functional correctness in a local test using the {@code DirectRunner} as well as running
 * integration tests with your production runner of choice. Typically, you can generate the input
 * data using {@link Create#of} or other transforms. However, if you need to test the behavior of
 * {@link StartBundle} and {@link FinishBundle} with particular bundle boundaries, you can use
 * {@link TestStream}.
 *
 * <p>Implementations must define a method annotated with {@link ProcessElement} that satisfies the
 * requirements described there. See the {@link ProcessElement} for details.
 *
 * <p>Example usage:
 *
 * <pre><code>
 * {@literal PCollection<String>} lines = ... ;
 * {@literal PCollection<String>} words =
 *     {@literal lines.apply(ParDo.of(new DoFn<String, String>())} {
 *         {@literal @ProcessElement}
 *          public void processElement({@literal @}Element String element, BoundedWindow window) {
 *            ...
 *          }}));
 * </code></pre>
 *
 * @param <InputT> the type of the (main) input elements
 * @param <OutputT> the type of the (main) output elements
 */
public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData {
  /** Information accessible while within the {@link StartBundle} method. */
  @SuppressWarnings("ClassCanBeStatic") // Converting class to static is an API change.
  public abstract class StartBundleContext {
    /**
     * Returns the {@code PipelineOptions} specified with the {@link
     * org.apache.beam.sdk.PipelineRunner} invoking this {@code DoFn}.
     */
    public abstract PipelineOptions getPipelineOptions();
  }

  /** Information accessible while within the {@link FinishBundle} method. */
  public abstract class FinishBundleContext {
    /**
     * Returns the {@code PipelineOptions} specified with the {@link
     * org.apache.beam.sdk.PipelineRunner} invoking this {@code DoFn}.
     */
    public abstract PipelineOptions getPipelineOptions();

    /**
     * Adds the given element to the main output {@code PCollection} at the given timestamp in the
     * given window.
     *
     * <p>Once passed to {@code output} the element should not be modified in any way.
     *
     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from the {@link
     * FinishBundle} method.
     */
    public abstract void output(@Nullable OutputT output, Instant timestamp, BoundedWindow window);

    /**
     * Adds the given element to the output {@code PCollection} with the given tag at the given
     * timestamp in the given window.
     *
     * <p>Once passed to {@code output} the element should not be modified in any way.
     *
     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from the {@link
     * FinishBundle} method.
     */
    public abstract <T> void output(
        TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window);
  }

  /**
   * Information accessible to all methods in this {@link DoFn} where the context is in some window.
   */
  public abstract class WindowedContext {
    /**
     * Returns the {@code PipelineOptions} specified with the {@link
     * org.apache.beam.sdk.PipelineRunner} invoking this {@code DoFn}.
     */
    public abstract PipelineOptions getPipelineOptions();

    /**
     * Adds the given element to the main output {@code PCollection}.
     *
     * <p>Once passed to {@code output} the element should not be modified in any way.
     *
     * <p>If invoked from {@link ProcessElement}, the output element will have the same timestamp
     * and be in the same windows as the input element passed to the method annotated with
     * {@code @ProcessElement}.
     *
     * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the
     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to
     * determine what windows the element should be in, throwing an exception if the {@code
     * WindowFn} attempts to access any information about the input element. The output element will
     * have a timestamp of negative infinity.
     *
     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle}
     * or {@link FinishBundle} methods.
     */
    public abstract void output(OutputT output);

    /**
     * Adds the given element to the main output {@code PCollection}, with the given timestamp.
     *
     * <p>Once passed to {@code outputWithTimestamp} the element should not be modified in any way.
     *
     * <p>If invoked from {@link ProcessElement}), the timestamp must not be older than the input
     * element's timestamp minus {@link DoFn#getAllowedTimestampSkew}. The output element will be in
     * the same windows as the input element.
     *
     * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the
     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to
     * determine what windows the element should be in, throwing an exception if the {@code
     * WindowFn} attempts to access any information about the input element except for the
     * timestamp.
     *
     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle}
     * or {@link FinishBundle} methods.
     */
    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);

    /**
     * Adds the given element to the output {@code PCollection} with the given tag.
     *
     * <p>Once passed to {@code output} the element should not be modified in any way.
     *
     * <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags} to specify the
     * tags of outputs that it consumes. Non-consumed outputs, e.g., outputs for monitoring purposes
     * only, don't necessarily need to be specified.
     *
     * <p>The output element will have the same timestamp and be in the same windows as the input
     * element passed to {@link ProcessElement}).
     *
     * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the
     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to
     * determine what windows the element should be in, throwing an exception if the {@code
     * WindowFn} attempts to access any information about the input element. The output element will
     * have a timestamp of negative infinity.
     *
     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle}
     * or {@link FinishBundle} methods.
     *
     * @see ParDo.SingleOutput#withOutputTags
     */
    public abstract <T> void output(TupleTag<T> tag, T output);

    /**
     * Adds the given element to the specified output {@code PCollection}, with the given timestamp.
     *
     * <p>Once passed to {@code outputWithTimestamp} the element should not be modified in any way.
     *
     * <p>If invoked from {@link ProcessElement}), the timestamp must not be older than the input
     * element's timestamp minus {@link DoFn#getAllowedTimestampSkew}. The output element will be in
     * the same windows as the input element.
     *
     * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the
     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to
     * determine what windows the element should be in, throwing an exception if the {@code
     * WindowFn} attempts to access any information about the input element except for the
     * timestamp.
     *
     * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle}
     * or {@link FinishBundle} methods.
     *
     * @see ParDo.SingleOutput#withOutputTags
     */
    public abstract <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
  }

  /** Information accessible when running a {@link DoFn.ProcessElement} method. */
  public abstract class ProcessContext extends WindowedContext {

    /**
     * Returns the input element to be processed.
     *
     * <p>The element will not be changed -- it is safe to cache, etc. without copying.
     * Implementation of {@link DoFn.ProcessElement} method should not mutate the element.
     */
    public abstract InputT element();

    /**
     * Returns the value of the side input.
     *
     * @throws IllegalArgumentException if this is not a side input
     * @see ParDo.SingleOutput#withSideInputs
     */
    public abstract <T> T sideInput(PCollectionView<T> view);

    /**
     * Returns the timestamp of the input element.
     *
     * <p>See {@link Window} for more information.
     */
    public abstract Instant timestamp();

    /**
     * Returns information about the pane within this window into which the input element has been
     * assigned.
     *
     * <p>Generally all data is in a single, uninteresting pane unless custom triggering and/or late
     * data has been explicitly requested. See {@link Window} for more information.
     */
    public abstract PaneInfo pane();

    /**
     * Gives the runner a (best-effort) lower bound about the timestamps of future output associated
     * with the current element.
     *
     * <p>If the {@link DoFn} has multiple outputs, the watermark applies to all of them.
     *
     * <p>Only splittable {@link DoFn DoFns} are allowed to call this method. It is safe to call
     * this method from a different thread than the one running {@link ProcessElement}, but all
     * calls must finish before {@link ProcessElement} returns.
     */
    public abstract void updateWatermark(Instant watermark);
  }

  /** Information accessible when running a {@link DoFn.OnTimer} method. */
  public abstract class OnTimerContext extends WindowedContext {

    /** Returns the timestamp of the current timer. */
    public abstract Instant timestamp();

    /** Returns the window in which the timer is firing. */
    public abstract BoundedWindow window();

    /** Returns the time domain of the current timer. */
    public abstract TimeDomain timeDomain();
  }

  /**
   * Returns the allowed timestamp skew duration, which is the maximum duration that timestamps can
   * be shifted backward in {@link WindowedContext#outputWithTimestamp}.
   *
   * <p>The default value is {@code Duration.ZERO}, in which case timestamps can only be shifted
   * forward to future. For infinite skew, return {@code Duration.millis(Long.MAX_VALUE)}.
   *
   * @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These
   *     elements are considered late, and if behind the {@link Window#withAllowedLateness(Duration)
   *     allowed lateness} of a downstream {@link PCollection} may be silently dropped. See
   *     https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
   */
  @Deprecated
  public Duration getAllowedTimestampSkew() {
    return Duration.ZERO;
  }

  /////////////////////////////////////////////////////////////////////////////

  /**
   * Returns a {@link TypeDescriptor} capturing what is known statically about the input type of
   * this {@code DoFn} instance's most-derived class.
   *
   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
   */
  public TypeDescriptor<InputT> getInputTypeDescriptor() {
    return new TypeDescriptor<InputT>(getClass()) {};
  }

  /**
   * Returns a {@link TypeDescriptor} capturing what is known statically about the output type of
   * this {@code DoFn} instance's most-derived class.
   *
   * <p>In the normal case of a concrete {@code DoFn} subclass with no generic type parameters of
   * its own (including anonymous inner classes), this will be a complete non-generic type, which is
   * good for choosing a default output {@code Coder<O>} for the output {@code PCollection<O>}.
   */
  public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
    return new TypeDescriptor<OutputT>(getClass()) {};
  }

  /** Receives values of the given type. */
  public interface OutputReceiver<T> {
    void output(T output);

    void outputWithTimestamp(T output, Instant timestamp);
  }

  /** Receives tagged output for a multi-output function. */
  public interface MultiOutputReceiver {
    /** Returns an {@link OutputReceiver} for the given tag. * */
    <T> OutputReceiver<T> get(TupleTag<T> tag);

    /**
     * Returns a {@link OutputReceiver} for publishing {@link Row} objects to the given tag.
     *
     * <p>The {@link PCollection} representing this tag must have a schema registered in order to
     * call this function.
     */
    @Experimental(Kind.SCHEMAS)
    <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag);
  }

  /////////////////////////////////////////////////////////////////////////////

  /**
   * Annotation for declaring and dereferencing state cells.
   *
   * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a {@link
   * StateId}. To use the cell during processing, add a parameter of the appropriate {@link State}
   * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and
   * annotate it with {@link StateId}. See the following code for an example:
   *
   * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
   *
   *  {@literal @StateId("my-state-id")}
   *  {@literal private final StateSpec<ValueState<MyState>>} myStateSpec =
   *       StateSpecs.value(new MyStateCoder());
   *
   *  {@literal @ProcessElement}
   *   public void processElement(
   *       {@literal @Element InputT element},
   *      {@literal @StateId("my-state-id") ValueState<MyState> myState}) {
   *     myState.read();
   *     myState.write(...);
   *   }
   * }
   * </code></pre>
   *
   * <p>State is subject to the following validity conditions:
   *
   * <ul>
   *   <li>Each state ID must be declared at most once.
   *   <li>Any state referenced in a parameter must be declared with the same state type.
   *   <li>State declarations must be final.
   * </ul>
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target({ElementType.FIELD, ElementType.PARAMETER})
  @Experimental(Kind.STATE)
  public @interface StateId {
    /** The state ID. */
    String value();
  }

  /**
   * Annotation for declaring and dereferencing timers.
   *
   * <p>To declare a timer, create a field of type {@link TimerSpec} annotated with a {@link
   * TimerId}. To use the cell during processing, add a parameter of the type {@link Timer} to your
   * {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and annotate it with
   * {@link TimerId}. See the following code for an example:
   *
   * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
   *   {@literal @TimerId("my-timer-id")}
   *    private final TimerSpec myTimer = TimerSpecs.timerForDomain(TimeDomain.EVENT_TIME);
   *
   *   {@literal @ProcessElement}
   *    public void processElement(
   *       {@literal @Element InputT element},
   *       {@literal @TimerId("my-timer-id") Timer myTimer}) {
   *      myTimer.offset(Duration.standardSeconds(...)).setRelative();
   *    }
   *
   *   {@literal @OnTimer("my-timer-id")}
   *    public void onMyTimer() {
   *      ...
   *    }
   * }</code></pre>
   *
   * <p>Timers are subject to the following validity conditions:
   *
   * <ul>
   *   <li>Each timer must have a distinct id.
   *   <li>Any timer referenced in a parameter must be declared.
   *   <li>Timer declarations must be final.
   *   <li>All declared timers must have a corresponding callback annotated with {@link
   *       OnTimer @OnTimer}.
   * </ul>
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target({ElementType.FIELD, ElementType.PARAMETER})
  @Experimental(Kind.TIMERS)
  public @interface TimerId {
    /** The timer ID. */
    String value();
  }

  /** Annotation for specifying specific fields that are accessed in a Schema PCollection. */
  @Retention(RetentionPolicy.RUNTIME)
  @Target({ElementType.FIELD, ElementType.PARAMETER})
  @Experimental(Kind.SCHEMAS)
  public @interface FieldAccess {
    String value();
  }

  /**
   * Annotation for registering a callback for a timer.
   *
   * <p>See the javadoc for {@link TimerId} for use in a full example.
   *
   * <p>The method annotated with {@code @OnTimer} may have parameters according to the same logic
   * as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State} subclasses,
   * and {@link Timer}. State and timer parameters must be annotated with their {@link StateId} and
   * {@link TimerId} respectively.
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  @Experimental(Kind.TIMERS)
  public @interface OnTimer {
    /** The timer ID. */
    String value();
  }

  /**
   * Annotation for the method to use for performing actions on window expiration. For example,
   * users can use this annotation to write a method that extracts a value saved in a state before
   * it gets garbage collected on window expiration.
   *
   * <p>The method annotated with {@code @OnWindowExpiration} may have parameters according to the
   * same logic as {@link OnTimer}. See the following code for an example:
   *
   * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
   *
   *   {@literal @ProcessElement}
   *    public void processElement(ProcessContext c) {
   *    }
   *
   *   {@literal @OnWindowExpiration}
   *    public void onWindowExpiration() {
   *      ...
   *    }
   * }</code></pre>
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  @Experimental(Kind.STATE)
  public @interface OnWindowExpiration {}

  /**
   * Annotation for the method to use to prepare an instance for processing bundles of elements.
   *
   * <p>This is a good place to initialize transient in-memory resources, such as network
   * connections. The resources can then be disposed in {@link Teardown}.
   *
   * <p>This is <b>not</b> a good place to perform external side-effects that later need cleanup,
   * e.g. creating temporary files on distributed filesystems, starting VMs, or initiating data
   * export jobs. Such logic must be instead implemented purely via {@link StartBundle}, {@link
   * ProcessElement} and {@link FinishBundle} methods, references to the objects requiring cleanup
   * must be passed as {@link PCollection} elements, and they must be cleaned up via regular Beam
   * transforms, e.g. see the {@link Wait} transform.
   *
   * <p>The method annotated with this must satisfy the following constraints:
   *
   * <ul>
   *   <li>It must have zero arguments.
   * </ul>
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  public @interface Setup {}

  /**
   * Annotation for the method to use to prepare an instance for processing a batch of elements. The
   * method annotated with this must satisfy the following constraints:
   *
   * <ul>
   *   <li>If one of the parameters is of type {@link DoFn.StartBundleContext}, then it will be
   *       passed a context object for the current execution.
   *   <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a
   *       mechanism to register a callback that will be invoked after the runner successfully
   *       commits the output of this bundle. See <a
   *       href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: How to
   *       Finalize Bundles</a> for further details.
   * </ul>
   */
  // TODO: Add support for bundle finalization parameter.
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  public @interface StartBundle {}

  /**
   * Annotation for the method to use for processing elements. A subclass of {@link DoFn} must have
   * a method with this annotation.
   *
   * <p>The signature of this method must satisfy the following constraints:
   *
   * <ul>
   *   <li>If one of its arguments is a {@link RestrictionTracker}, then it is a <a
   *       href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} subject to the
   *       separate requirements described below. Items below are assuming this is not a splittable
   *       {@link DoFn}.
   *   <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
   *       passed the current element being processed; the argument type must match the input type
   *       of this DoFn.
   *   <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
   *       passed the timestamp of the current element being processed; the argument must be of type
   *       {@link Instant}.
   *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
   *       window of the current element. When applied by {@link ParDo} the subtype of {@link
   *       BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
   *       window is not accessed a runner may perform additional optimizations.
   *   <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information
   *       about the current triggering pane.
   *   <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
   *       options for the current pipeline.
   *   <li>If one of the parameters is of type {@link OutputReceiver}, then it will be passed an
   *       output receiver for outputting elements to the default output.
   *   <li>If one of the parameters is of type {@link MultiOutputReceiver}, then it will be passed
   *       an output receiver for outputting to multiple tagged outputs.
   *   <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a
   *       mechanism to register a callback that will be invoked after the runner successfully
   *       commits the output of this bundle. See <a
   *       href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: How to
   *       Finalize Bundles</a> for further details.
   *   <li>It must return {@code void}.
   * </ul>
   *
   * <h2>Splittable DoFn's</h2>
   *
   * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} method has a parameter
   * whose type is of {@link RestrictionTracker}. This is an advanced feature and an overwhelming
   * majority of users will never need to write a splittable {@link DoFn}.
   *
   * <p>Not all runners support Splittable DoFn. See the <a
   * href="https://beam.apache.org/documentation/runners/capability-matrix/">capability matrix</a>.
   *
   * <p>See <a href="https://s.apache.org/splittable-do-fn">the proposal</a> for an overview of the
   * involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, <i>restriction tracker</i>).
   *
   * <p>If a {@link DoFn} is splittable, the following constraints must be respected:
   *
   * <ul>
   *   <li>It <i>must</i> define a {@link GetInitialRestriction} method.
   *   <li>It <i>may</i> define a {@link GetSize} method.
   *   <li>It <i>may</i> define a {@link GetPartitition} method.
   *   <li>It <i>may</i> define a {@link SplitRestriction} method.
   *   <li>It <i>may</i> define a {@link NewTracker} method returning a subtype of {@code
   *       RestrictionTracker<R>} where {@code R} is the restriction type returned by {@link
   *       GetInitialRestriction}. This method is optional in case the restriction type returned by
   *       {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
   *   <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
   *   <li>The type of restrictions used by all of these methods must be the same.
   *   <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
   *       indicate whether there is more work to be done for the current element.
   *   <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such
   *       as {@link BoundedWindow}.
   *   <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or {@link
   *       UnboundedPerElement}, but not both at the same time. If it's not annotated with either of
   *       these, it's assumed to be {@link BoundedPerElement} if its {@link ProcessElement} method
   *       returns {@code void} and {@link UnboundedPerElement} if it returns a {@link
   *       ProcessContinuation}.
   * </ul>
   *
   * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  public @interface ProcessElement {}

  /** Parameter annotation for the input element for a {@link ProcessElement} method. */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.PARAMETER)
  public @interface Element {}

  /** Parameter annotation for the input element timestamp for a {@link ProcessElement} method. */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.PARAMETER)
  public @interface Timestamp {}

  /** Parameter annotation for the SideInput for a {@link ProcessElement} method. */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.PARAMETER)
  public @interface SideInput {
    /** The SideInput tag ID. */
    String value();
  }
  /**
   * <b><i>Experimental - no backwards compatibility guarantees. The exact name or usage of this
   * feature may change.</i></b>
   *
   * <p>Annotation that may be added to a {@link ProcessElement}, {@link OnTimer}, or {@link
   * OnWindowExpiration} method to indicate that the runner must ensure that the observable contents
   * of the input {@link PCollection} or mutable state must be stable upon retries.
   *
   * <p>This is important for sinks, which must ensure exactly-once semantics when writing to a
   * storage medium outside of your pipeline. A general pattern for a basic sink is to write a
   * {@link DoFn} that can perform an idempotent write, and annotate that it requires stable input.
   * Combined, these allow the write to be freely retried until success.
   *
   * <p>An example of an unstable input would be anything computed using nondeterministic logic. In
   * Beam, any user-defined function is permitted to be nondeterministic, and any {@link
   * PCollection} is permitted to be recomputed in any manner.
   */
  @Documented
  @Experimental
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  public @interface RequiresStableInput {}

  /**
   * Annotation for the method to use to finish processing a batch of elements. The method annotated
   * with this must satisfy the following constraints:
   *
   * <ul>
   *   <li>If one of the parameters is of type {@link DoFn.FinishBundleContext}, then it will be
   *       passed a context object for the current execution.
   *   <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a
   *       mechanism to register a callback that will be invoked after the runner successfully
   *       commits the output of this bundle. See <a
   *       href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: How to
   *       Finalize Bundles</a> for further details.
   * </ul>
   *
   * <p>Note that {@link FinishBundle @FinishBundle} is invoked before the runner commits the output
   * while {@link BundleFinalizer.Callback bundle finalizer callbacks} are invoked after the runner
   * has committed the output of a successful bundle.
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  public @interface FinishBundle {}

  /**
   * Annotation for the method to use to clean up this instance before it is discarded. No other
   * method will be called after a call to the annotated method is made.
   *
   * <p>A runner will do its best to call this method on any given instance to prevent leaks of
   * transient resources, however, there may be situations where this is impossible (e.g. process
   * crash, hardware failure, etc.) or unnecessary (e.g. the pipeline is shutting down and the
   * process is about to be killed anyway, so all transient resources will be released automatically
   * by the OS). In these cases, the call may not happen. It will also not be retried, because in
   * such situations the DoFn instance no longer exists, so there's no instance to retry it on.
   *
   * <p>Thus, all work that depends on input elements, and all externally important side effects,
   * must be performed in the {@link ProcessElement} or {@link FinishBundle} methods.
   *
   * <p>Example things that are a good idea to do in this method:
   *
   * <ul>
   *   <li>Close a network connection that was opened in {@link Setup}
   *   <li>Shut down a helper process that was started in {@link Setup}
   * </ul>
   *
   * <p>Example things that MUST NOT be done in this method:
   *
   * <ul>
   *   <li>Flushing a batch of buffered records to a database: this must be done in {@link
   *       FinishBundle}.
   *   <li>Deleting temporary files on a distributed filesystem: this must be done using the
   *       pipeline structure, e.g. using the {@link Wait} transform.
   * </ul>
   *
   * <p>The method annotated with this must satisfy the following constraint:
   *
   * <ul>
   *   <li>It must have zero arguments.
   * </ul>
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  public @interface Teardown {}

  /**
   * Annotation for the method that maps an element to an initial restriction for a <a
   * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
   *
   * <p>Signature: {@code RestrictionT getInitialRestriction(InputT element);}
   */
  // TODO: Make the InputT parameter optional.
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  @Experimental(Kind.SPLITTABLE_DO_FN)
  public @interface GetInitialRestriction {}

  /**
   * Annotation for the method that returns the corresponding size for an element and restriction
   * pair.
   *
   * <p>Signature: {@code double getSize(InputT element, RestrictionT restriction);}
   *
   * <p>Returns a double representing the size of the element and restriction.
   *
   * <p>A representation for the amount of known work represented as a size. Size representations
   * should preferably represent a linear space and be comparable within the same partition (see
   * {@link GetPartition} for details on partition identifiers}).
   *
   * <p>Splittable {@link DoFn}s should only provide this method if the default implementation
   * within the {@link RestrictionTracker} is an inaccurate representation of known work.
   *
   * <p>It is up to each splittable {@DoFn} to convert between their natural representation of
   * outstanding work and this representation. For example:
   *
   * <ul>
   *   <li>Block based file source (e.g. Avro): From the end of the current block, the remaining
   *       number of bytes to the end of the restriction.
   *   <li>Pull based queue based source (e.g. Pubsub): The local/global size available in number of
   *       messages or number of {@code message bytes} that have not been processed.
   *   <li>Key range based source (e.g. Shuffle, Bigtable, ...): Scale the start key to be one and
   *       end key to be zero and interpolate the position of the next splittable key as the size.
   *       If information about the probability density function or cumulative distribution function
   *       is available, size interpolation can be improved. Alternatively, if the number of encoded
   *       bytes for the keys and values is known for the key range, the number of remaining bytes
   *       can be used.
   * </ul>
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  @Experimental(Kind.SPLITTABLE_DO_FN)
  public @interface GetSize {}

  /**
   * Annotation for the method that returns the corresponding partition identifier for an element
   * and restriction pair.
   *
   * <p>Signature: {@code byte[] getPartitition(InputT element, RestrictionT restriction);}
   *
   * <p>Returns an immutable representation of the partition identifier as a byte[].
   *
   * <p>By default, the partition identifier is represented as the encoded element and restriction
   * pair and should only be provided if the splittable {@link DoFn} can only provide a size over a
   * shared resource such as a message queue that potentially multiple element and restriction pairs
   * are doing work on. The partition identifier is used by runners for various size calculations.
   * Sizes reported with the same partition identifier represent a point in time reporting of the
   * size for that partition. For example, a runner can compute a global size by summing all
   * reported sizes over all unique partition identifiers while it can compute the size of a
   * specific partition based upon the last reported value.
   *
   * <p>For example splittable {@link DoFn}s which consume elements from:
   *
   * <ul>
   *   <li>a globally shared resource such as a Pubsub queue should set this to "".
   *   <li>a shared partitioned resource should use the partition identifier.
   *   <li>a uniquely partitioned resource such as a file and offset range should not override this
   *       since the default element and restriction pair should suffice.
   * </ul>
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  @Experimental(Kind.SPLITTABLE_DO_FN)
  public @interface GetPartition {}

  /**
   * Annotation for the method that returns the coder to use for the restriction of a <a
   * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
   *
   * <p>If not defined, a coder will be inferred using standard coder inference rules and the
   * pipeline's {@link Pipeline#getCoderRegistry coder registry}.
   *
   * <p>This method will be called only at pipeline construction time.
   *
   * <p>Signature: {@code Coder<RestrictionT> getRestrictionCoder();}
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  @Experimental(Kind.SPLITTABLE_DO_FN)
  public @interface GetRestrictionCoder {}

  /**
   * Annotation for the method that splits restriction of a <a
   * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into multiple parts to
   * be processed in parallel.
   *
   * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element, RestrictionT
   * restriction);}
   *
   * <p>Optional: if this method is omitted, the restriction will not be split (equivalent to
   * defining the method and returning {@code Collections.singletonList(restriction)}).
   */
  // TODO: Make the InputT parameter optional.
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  @Experimental(Kind.SPLITTABLE_DO_FN)
  public @interface SplitRestriction {}

  /**
   * Annotation for the method that creates a new {@link RestrictionTracker} for the restriction of
   * a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
   *
   * <p>Signature: {@code MyRestrictionTracker newTracker(RestrictionT restriction);} where {@code
   * MyRestrictionTracker} must be a subtype of {@code RestrictionTracker<RestrictionT>}.
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.METHOD)
  @Experimental(Kind.SPLITTABLE_DO_FN)
  public @interface NewTracker {}

  /**
   * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
   * specifying that the {@link DoFn} performs a bounded amount of work per input element, so
   * applying it to a bounded {@link PCollection} will produce also a bounded {@link PCollection}.
   * It is an error to specify this on a non-splittable {@link DoFn}.
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.TYPE)
  @Experimental(Kind.SPLITTABLE_DO_FN)
  public @interface BoundedPerElement {}

  /**
   * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
   * specifying that the {@link DoFn} performs an unbounded amount of work per input element, so
   * applying it to a bounded {@link PCollection} will produce an unbounded {@link PCollection}. It
   * is an error to specify this on a non-splittable {@link DoFn}.
   */
  @Documented
  @Retention(RetentionPolicy.RUNTIME)
  @Target(ElementType.TYPE)
  @Experimental(Kind.SPLITTABLE_DO_FN)
  public @interface UnboundedPerElement {}

  // This can't be put into ProcessContinuation itself due to the following problem:
  // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
  private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
      new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO);

  /**
   * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
   * be done for the current element.
   *
   * <p>If the {@link ProcessElement} call completes because of a failed {@code tryClaim()} call on
   * the {@link RestrictionTracker}, then the call MUST return {@link #stop()}.
   */
  @Experimental(Kind.SPLITTABLE_DO_FN)
  @AutoValue
  public abstract static class ProcessContinuation {
    /** Indicates that there is no more work to be done for the current element. */
    public static ProcessContinuation stop() {
      return PROCESS_CONTINUATION_STOP;
    }

    /** Indicates that there is more work to be done for the current element. */
    public static ProcessContinuation resume() {
      return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO);
    }

    /**
     * If false, the {@link DoFn} promises that there is no more work remaining for the current
     * element, so the runner should not resume the {@link ProcessElement} call.
     */
    public abstract boolean shouldResume();

    /**
     * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
     * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
     */
    public abstract Duration resumeDelay();

    /** Builder method to set the value of {@link #resumeDelay()}. */
    public ProcessContinuation withResumeDelay(Duration resumeDelay) {
      return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay);
    }
  }

  /**
   * Finalize the {@link DoFn} construction to prepare for processing. This method should be called
   * by runners before any processing methods.
   *
   * @deprecated use {@link Setup} or {@link StartBundle} instead. This method will be removed in a
   *     future release.
   */
  @Deprecated
  public final void prepareForProcessing() {}

  /**
   * {@inheritDoc}
   *
   * <p>By default, does not register any display data. Implementors may override this method to
   * provide their own display data.
   */
  @Override
  public void populateDisplayData(DisplayData.Builder builder) {}

  /**
   * A parameter that is accessible during {@link StartBundle @StartBundle}, {@link
   * ProcessElement @ProcessElement} and {@link FinishBundle @FinishBundle} that allows the caller
   * to register a callback that will be invoked after the bundle has been successfully completed
   * and the runner has commit the output.
   *
   * <p>A common usage would be to perform any acknowledgements required by an external system such
   * as acking messages from a message queue since this callback is only invoked after the output of
   * the bundle has been durably persisted by the runner.
   *
   * <p>Note that a runner may make the output of the bundle available immediately to downstream
   * consumers without waiting for finalization to succeed. For pipelines that are sensitive to
   * duplicate messages, they must perform output deduplication in the pipeline.
   */
  // TODO: Add support for a deduplication PTransform.
  @Experimental(Kind.SPLITTABLE_DO_FN)
  public interface BundleFinalizer {
    /**
     * The provided function will be called after the runner successfully commits the output of a
     * successful bundle. Throwing during finalization represents that bundle finalization may have
     * failed and the runner may choose to attempt finalization again. The provided {@code
     * callbackExpiry} controls how long the finalization is valid for before it is garbage
     * collected and no longer able to be invoked.
     *
     * <p>Note that finalization is best effort and it is expected that the external system will
     * self recover state if finalization never happens or consistently fails. For example, a queue
     * based system that requires message acknowledgement would replay messages if that
     * acknowledgement was never received within the provided time bound.
     *
     * <p>See <a href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API:
     * How to Finalize Bundles</a> for further details.
     *
     * @param callbackExpiry When the finalization callback expires. If the runner cannot commit
     *     results and execute the callback within this duration, the callback will not be invoked.
     * @param callback The finalization callback method for the runner to invoke after processing
     *     results have been successfully committed.
     */
    void afterBundleCommit(Instant callbackExpiry, Callback callback);

    /**
     * An instance of a function that will be invoked after bundle finalization.
     *
     * <p>Note that this function should maintain all state necessary outside of a DoFn's context to
     * be able to perform bundle finalization and should not rely on mutable state stored within a
     * DoFn instance.
     */
    @FunctionalInterface
    interface Callback {
      void onBundleSuccess() throws Exception;
    }
  }
}
