blob: 1ea3547d9199cb104ae42069b38de001ca9b2486 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.reflect;
import com.google.auto.value.AutoValue;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SchemaElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SideInputParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
/**
* Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra
* context it requires, types of the input and output elements, etc.
*
* <p>See <a href="https://s.apache.org/a-new-dofn">A new DoFn</a>.
*/
@AutoValue
public abstract class DoFnSignature {
/** Class of the original {@link DoFn} from which this signature was produced. */
public abstract Class<? extends DoFn<?, ?>> fnClass();
/** Whether this {@link DoFn} does a bounded amount of work per element. */
public abstract PCollection.IsBounded isBoundedPerElement();
/** Details about this {@link DoFn}'s {@link DoFn.ProcessElement} method. */
public abstract ProcessElementMethod processElement();
/** Details about the state cells that this {@link DoFn} declares. Immutable. */
public abstract Map<String, StateDeclaration> stateDeclarations();
/** Details about this {@link DoFn}'s {@link DoFn.StartBundle} method. */
@Nullable
public abstract BundleMethod startBundle();
/** Details about this {@link DoFn}'s {@link DoFn.FinishBundle} method. */
@Nullable
public abstract BundleMethod finishBundle();
/** Details about this {@link DoFn}'s {@link DoFn.Setup} method. */
@Nullable
public abstract LifecycleMethod setup();
/** Details about this {@link DoFn}'s {@link DoFn.Teardown} method. */
@Nullable
public abstract LifecycleMethod teardown();
/** Details about this {@link DoFn}'s {@link DoFn.OnWindowExpiration} method. */
@Nullable
public abstract OnWindowExpirationMethod onWindowExpiration();
/** Timer declarations present on the {@link DoFn} class. Immutable. */
public abstract Map<String, TimerDeclaration> timerDeclarations();
/** TimerMap declarations present on the {@link DoFn} class. Immutable. */
public abstract Map<String, TimerFamilyDeclaration> timerFamilyDeclarations();
/** Field access declaration. */
@Nullable
public abstract Map<String, FieldAccessDeclaration> fieldAccessDeclarations();
/** Details about this {@link DoFn}'s {@link DoFn.GetInitialRestriction} method. */
@Nullable
public abstract GetInitialRestrictionMethod getInitialRestriction();
/** Details about this {@link DoFn}'s {@link DoFn.SplitRestriction} method. */
@Nullable
public abstract SplitRestrictionMethod splitRestriction();
/** Details about this {@link DoFn}'s {@link DoFn.GetRestrictionCoder} method. */
@Nullable
public abstract GetRestrictionCoderMethod getRestrictionCoder();
/** Details about this {@link DoFn}'s {@link DoFn.NewTracker} method. */
@Nullable
public abstract NewTrackerMethod newTracker();
/** Details about this {@link DoFn}'s {@link DoFn.OnTimer} methods. */
@Nullable
public abstract Map<String, OnTimerMethod> onTimerMethods();
/** Details about this {@link DoFn}'s {@link DoFn.OnTimerFamily} methods. */
@Nullable
public abstract Map<String, OnTimerFamilyMethod> onTimerFamilyMethods();
/** @deprecated use {@link #usesState()}, it's cleaner */
@Deprecated
public boolean isStateful() {
return stateDeclarations().size() > 0;
}
/** Whether the {@link DoFn} described by this signature uses state. */
public boolean usesState() {
return stateDeclarations().size() > 0;
}
/** Whether the {@link DoFn} described by this signature uses timers. */
public boolean usesTimers() {
return timerDeclarations().size() > 0 || timerFamilyDeclarations().size() > 0;
}
static Builder builder() {
return new AutoValue_DoFnSignature.Builder();
}
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFnClass(Class<? extends DoFn<?, ?>> fnClass);
abstract Builder setIsBoundedPerElement(PCollection.IsBounded isBounded);
abstract Builder setProcessElement(ProcessElementMethod processElement);
abstract Builder setStartBundle(BundleMethod startBundle);
abstract Builder setFinishBundle(BundleMethod finishBundle);
abstract Builder setSetup(LifecycleMethod setup);
abstract Builder setTeardown(LifecycleMethod teardown);
abstract Builder setOnWindowExpiration(OnWindowExpirationMethod onWindowExpiration);
abstract Builder setGetInitialRestriction(GetInitialRestrictionMethod getInitialRestriction);
abstract Builder setSplitRestriction(SplitRestrictionMethod splitRestriction);
abstract Builder setGetRestrictionCoder(GetRestrictionCoderMethod getRestrictionCoder);
abstract Builder setNewTracker(NewTrackerMethod newTracker);
abstract Builder setStateDeclarations(Map<String, StateDeclaration> stateDeclarations);
abstract Builder setTimerDeclarations(Map<String, TimerDeclaration> timerDeclarations);
abstract Builder setTimerFamilyDeclarations(
Map<String, TimerFamilyDeclaration> timerFamilyDeclarations);
abstract Builder setFieldAccessDeclarations(
Map<String, FieldAccessDeclaration> fieldAccessDeclaration);
abstract Builder setOnTimerMethods(Map<String, OnTimerMethod> onTimerMethods);
abstract Builder setOnTimerFamilyMethods(Map<String, OnTimerFamilyMethod> onTimerFamilyMethods);
abstract DoFnSignature build();
}
/** A method delegated to an annotated method of an underlying {@link DoFn}. */
public interface DoFnMethod {
/** The annotated method itself. */
Method targetMethod();
}
/**
* A method delegated to an annotated method of an underlying {@link DoFn} that accepts a dynamic
* list of parameters.
*/
public interface MethodWithExtraParameters extends DoFnMethod {
/**
* Types of optional parameters of the annotated method, in the order they appear.
*
* <p>Validation that these are allowed is external to this class.
*/
List<Parameter> extraParameters();
/** The type of window expected by this method, if any. */
@Nullable
TypeDescriptor<? extends BoundedWindow> windowT();
}
/** A descriptor for an optional parameter of the {@link DoFn.ProcessElement} method. */
public abstract static class Parameter {
// Private as no extensions other than those nested here are permitted
private Parameter() {}
/**
* Performs case analysis on this {@link Parameter}, processing it with the appropriate {@link
* Cases#dispatch} case of the provided {@link Cases} object.
*/
public <ResultT> ResultT match(Cases<ResultT> cases) {
// This could be done with reflection, but since the number of cases is small and known,
// they are simply inlined.
if (this instanceof StartBundleContextParameter) {
return cases.dispatch((StartBundleContextParameter) this);
} else if (this instanceof FinishBundleContextParameter) {
return cases.dispatch((FinishBundleContextParameter) this);
} else if (this instanceof ProcessContextParameter) {
return cases.dispatch((ProcessContextParameter) this);
} else if (this instanceof OnTimerContextParameter) {
return cases.dispatch((OnTimerContextParameter) this);
} else if (this instanceof WindowParameter) {
return cases.dispatch((WindowParameter) this);
} else if (this instanceof PaneInfoParameter) {
return cases.dispatch((PaneInfoParameter) this);
} else if (this instanceof RestrictionTrackerParameter) {
return cases.dispatch((RestrictionTrackerParameter) this);
} else if (this instanceof StateParameter) {
return cases.dispatch((StateParameter) this);
} else if (this instanceof TimerParameter) {
return cases.dispatch((TimerParameter) this);
} else if (this instanceof PipelineOptionsParameter) {
return cases.dispatch((PipelineOptionsParameter) this);
} else if (this instanceof ElementParameter) {
return cases.dispatch((ElementParameter) this);
} else if (this instanceof SchemaElementParameter) {
return cases.dispatch((SchemaElementParameter) this);
} else if (this instanceof TimestampParameter) {
return cases.dispatch((TimestampParameter) this);
} else if (this instanceof OutputReceiverParameter) {
return cases.dispatch((OutputReceiverParameter) this);
} else if (this instanceof TaggedOutputReceiverParameter) {
return cases.dispatch((TaggedOutputReceiverParameter) this);
} else if (this instanceof TimeDomainParameter) {
return cases.dispatch((TimeDomainParameter) this);
} else if (this instanceof SideInputParameter) {
return cases.dispatch((SideInputParameter) this);
} else if (this instanceof TimerFamilyParameter) {
return cases.dispatch((TimerFamilyParameter) this);
} else if (this instanceof TimerIdParameter) {
return cases.dispatch((TimerIdParameter) this);
} else {
throw new IllegalStateException(
String.format(
"Attempt to case match on unknown %s subclass %s",
Parameter.class.getCanonicalName(), this.getClass().getCanonicalName()));
}
}
/** An interface for destructuring a {@link Parameter}. */
public interface Cases<ResultT> {
ResultT dispatch(StartBundleContextParameter p);
ResultT dispatch(FinishBundleContextParameter p);
ResultT dispatch(ProcessContextParameter p);
ResultT dispatch(ElementParameter p);
ResultT dispatch(SchemaElementParameter p);
ResultT dispatch(TimestampParameter p);
ResultT dispatch(TimeDomainParameter p);
ResultT dispatch(OutputReceiverParameter p);
ResultT dispatch(TaggedOutputReceiverParameter p);
ResultT dispatch(OnTimerContextParameter p);
ResultT dispatch(WindowParameter p);
ResultT dispatch(PaneInfoParameter p);
ResultT dispatch(RestrictionTrackerParameter p);
ResultT dispatch(StateParameter p);
ResultT dispatch(TimerParameter p);
ResultT dispatch(PipelineOptionsParameter p);
ResultT dispatch(SideInputParameter p);
ResultT dispatch(TimerFamilyParameter p);
ResultT dispatch(TimerIdParameter p);
/** A base class for a visitor with a default method for cases it is not interested in. */
abstract class WithDefault<ResultT> implements Cases<ResultT> {
protected abstract ResultT dispatchDefault(Parameter p);
@Override
public ResultT dispatch(StartBundleContextParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(FinishBundleContextParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(ProcessContextParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(ElementParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(SchemaElementParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(TaggedOutputReceiverParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(OutputReceiverParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(TimestampParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(TimerIdParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(TimeDomainParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(OnTimerContextParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(WindowParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(PaneInfoParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(RestrictionTrackerParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(StateParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(TimerParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(PipelineOptionsParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(SideInputParameter p) {
return dispatchDefault(p);
}
@Override
public ResultT dispatch(TimerFamilyParameter p) {
return dispatchDefault(p);
}
}
}
// These parameter descriptors are constant.
private static final StartBundleContextParameter START_BUNDLE_CONTEXT_PARAMETER =
new AutoValue_DoFnSignature_Parameter_StartBundleContextParameter();
private static final FinishBundleContextParameter FINISH_BUNDLE_CONTEXT_PARAMETER =
new AutoValue_DoFnSignature_Parameter_FinishBundleContextParameter();
private static final ProcessContextParameter PROCESS_CONTEXT_PARAMETER =
new AutoValue_DoFnSignature_Parameter_ProcessContextParameter();
private static final OnTimerContextParameter ON_TIMER_CONTEXT_PARAMETER =
new AutoValue_DoFnSignature_Parameter_OnTimerContextParameter();
private static final TimestampParameter TIMESTAMP_PARAMETER =
new AutoValue_DoFnSignature_Parameter_TimestampParameter();
private static final TimerIdParameter TIMER_ID_PARAMETER =
new AutoValue_DoFnSignature_Parameter_TimerIdParameter();
private static final PaneInfoParameter PANE_INFO_PARAMETER =
new AutoValue_DoFnSignature_Parameter_PaneInfoParameter();
private static final TimeDomainParameter TIME_DOMAIN_PARAMETER =
new AutoValue_DoFnSignature_Parameter_TimeDomainParameter();
private static final TaggedOutputReceiverParameter TAGGED_OUTPUT_RECEIVER_PARAMETER =
new AutoValue_DoFnSignature_Parameter_TaggedOutputReceiverParameter();
private static final PipelineOptionsParameter PIPELINE_OPTIONS_PARAMETER =
new AutoValue_DoFnSignature_Parameter_PipelineOptionsParameter();
/** Returns a {@link ProcessContextParameter}. */
public static ProcessContextParameter processContext() {
return PROCESS_CONTEXT_PARAMETER;
}
public static ElementParameter elementParameter(TypeDescriptor<?> elementT) {
return new AutoValue_DoFnSignature_Parameter_ElementParameter(elementT);
}
public static SchemaElementParameter schemaElementParameter(
TypeDescriptor<?> elementT, @Nullable String fieldAccessString, int index) {
return new AutoValue_DoFnSignature_Parameter_SchemaElementParameter.Builder()
.setElementT(elementT)
.setFieldAccessString(fieldAccessString)
.setIndex(index)
.build();
}
public static TimestampParameter timestampParameter() {
return TIMESTAMP_PARAMETER;
}
public static TimerIdParameter timerIdParameter() {
return TIMER_ID_PARAMETER;
}
public static SideInputParameter sideInputParameter(
TypeDescriptor<?> elementT, String sideInputId) {
return new AutoValue_DoFnSignature_Parameter_SideInputParameter.Builder()
.setElementT(elementT)
.setSideInputId(sideInputId)
.build();
}
public static TimeDomainParameter timeDomainParameter() {
return TIME_DOMAIN_PARAMETER;
}
public static OutputReceiverParameter outputReceiverParameter(boolean rowReceiver) {
return new AutoValue_DoFnSignature_Parameter_OutputReceiverParameter(rowReceiver);
}
public static TaggedOutputReceiverParameter taggedOutputReceiverParameter() {
return TAGGED_OUTPUT_RECEIVER_PARAMETER;
}
/** Returns a {@link OnTimerContextParameter}. */
public static OnTimerContextParameter onTimerContext() {
return ON_TIMER_CONTEXT_PARAMETER;
}
public static PaneInfoParameter paneInfoParameter() {
return PANE_INFO_PARAMETER;
}
/** Returns a {@link WindowParameter}. */
public static WindowParameter boundedWindow(TypeDescriptor<? extends BoundedWindow> windowT) {
return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT);
}
/** Returns a {@link PipelineOptionsParameter}. */
public static PipelineOptionsParameter pipelineOptions() {
return PIPELINE_OPTIONS_PARAMETER;
}
/** Returns a {@link RestrictionTrackerParameter}. */
public static RestrictionTrackerParameter restrictionTracker(TypeDescriptor<?> trackerT) {
return new AutoValue_DoFnSignature_Parameter_RestrictionTrackerParameter(trackerT);
}
/** Returns a {@link StateParameter} referring to the given {@link StateDeclaration}. */
public static StateParameter stateParameter(StateDeclaration decl) {
return new AutoValue_DoFnSignature_Parameter_StateParameter(decl);
}
public static TimerParameter timerParameter(TimerDeclaration decl) {
return new AutoValue_DoFnSignature_Parameter_TimerParameter(decl);
}
public static TimerFamilyParameter timerFamilyParameter(TimerFamilyDeclaration decl) {
return new AutoValue_DoFnSignature_Parameter_TimerFamilyParameter(decl);
}
/** Descriptor for a {@link Parameter} of a subtype of {@link PipelineOptions}. */
@AutoValue
public abstract static class PipelineOptionsParameter extends Parameter {
PipelineOptionsParameter() {}
}
/**
* Descriptor for a {@link Parameter} of type {@link DoFn.StartBundleContext}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class StartBundleContextParameter extends Parameter {
StartBundleContextParameter() {}
}
/**
* Descriptor for a {@link Parameter} of type {@link DoFn.FinishBundleContext}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class FinishBundleContextParameter extends Parameter {
FinishBundleContextParameter() {}
}
/**
* Descriptor for a {@link Parameter} of type {@link DoFn.ProcessContext}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class ProcessContextParameter extends Parameter {
ProcessContextParameter() {}
}
/**
* Descriptor for a {@link Parameter} of type {@link DoFn.Element}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class ElementParameter extends Parameter {
ElementParameter() {}
public abstract TypeDescriptor<?> elementT();
}
/**
* Descriptor for a (@link Parameter} of type {@link DoFn.Element} where the type does not match
* the DoFn's input type. This implies that the input must have a schema that is compatible.
*/
@AutoValue
public abstract static class SchemaElementParameter extends Parameter {
SchemaElementParameter() {}
public abstract TypeDescriptor<?> elementT();
@Nullable
public abstract String fieldAccessString();
public abstract int index();
/** Builder class. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setElementT(TypeDescriptor<?> elementT);
public abstract Builder setFieldAccessString(@Nullable String fieldAccess);
public abstract Builder setIndex(int index);
public abstract SchemaElementParameter build();
}
public abstract Builder toBuilder();
}
/**
* Descriptor for a {@link Parameter} of type {@link DoFn.Timestamp}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class TimestampParameter extends Parameter {
TimestampParameter() {}
}
@AutoValue
public abstract static class TimerIdParameter extends Parameter {
TimerIdParameter() {}
}
/**
* Descriptor for a {@link Parameter} representing the time domain of a timer.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class TimeDomainParameter extends Parameter {
TimeDomainParameter() {}
}
/** Descriptor for a {@link Parameter} of type {@link DoFn.SideInput}. */
@AutoValue
public abstract static class SideInputParameter extends Parameter {
SideInputParameter() {}
public abstract TypeDescriptor<?> elementT();
public abstract String sideInputId();
/** Builder class. */
@AutoValue.Builder
public abstract static class Builder {
public abstract SideInputParameter.Builder setElementT(TypeDescriptor<?> elementT);
public abstract SideInputParameter.Builder setSideInputId(String sideInput);
public abstract SideInputParameter build();
}
public abstract SideInputParameter.Builder toBuilder();
}
/**
* Descriptor for a {@link Parameter} of type {@link DoFn.OutputReceiver}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class OutputReceiverParameter extends Parameter {
OutputReceiverParameter() {}
public abstract boolean isRowReceiver();
}
/**
* Descriptor for a {@link Parameter} of type {@link MultiOutputReceiver}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class TaggedOutputReceiverParameter extends Parameter {
TaggedOutputReceiverParameter() {}
}
/**
* Descriptor for a {@link Parameter} of type {@link DoFn.OnTimerContext}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class OnTimerContextParameter extends Parameter {
OnTimerContextParameter() {}
}
/**
* Descriptor for a {@link Parameter} of type {@link BoundedWindow}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class WindowParameter extends Parameter {
WindowParameter() {}
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
}
/**
* Descriptor for a {@link Parameter} of type {@link
* org.apache.beam.sdk.transforms.windowing.PaneInfo}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class PaneInfoParameter extends Parameter {
PaneInfoParameter() {}
}
/**
* Descriptor for a {@link Parameter} of a subclass of {@link RestrictionTracker}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class RestrictionTrackerParameter extends Parameter {
// Package visible for AutoValue
RestrictionTrackerParameter() {}
public abstract TypeDescriptor<?> trackerT();
}
/**
* Descriptor for a {@link Parameter} of a subclass of {@link State}, with an id indicated by
* its {@link StateId} annotation.
*
* <p>All descriptors for the same declared state are equal.
*/
@AutoValue
public abstract static class StateParameter extends Parameter {
// Package visible for AutoValue
StateParameter() {}
public abstract StateDeclaration referent();
}
/**
* Descriptor for a {@link Parameter} of type {@link Timer}, with an id indicated by its {@link
* TimerId} annotation.
*/
@AutoValue
public abstract static class TimerParameter extends Parameter {
// Package visible for AutoValue
TimerParameter() {}
public abstract TimerDeclaration referent();
}
/** Descriptor for a {@link Parameter} of type {@link DoFn.TimerFamily}. */
@AutoValue
public abstract static class TimerFamilyParameter extends Parameter {
// Package visible for AutoValue
TimerFamilyParameter() {}
public abstract TimerFamilyDeclaration referent();
}
}
/** Describes a {@link DoFn.ProcessElement} method. */
@AutoValue
public abstract static class ProcessElementMethod implements MethodWithExtraParameters {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();
/**
* Whether this method requires stable input, expressed via {@link
* org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}.
*/
public abstract boolean requiresStableInput();
/** Concrete type of the {@link RestrictionTracker} parameter, if present. */
@Nullable
public abstract TypeDescriptor<?> trackerT();
/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
/** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
public abstract boolean hasReturnValue();
static ProcessElementMethod create(
Method targetMethod,
List<Parameter> extraParameters,
boolean requiresStableInput,
TypeDescriptor<?> trackerT,
@Nullable TypeDescriptor<? extends BoundedWindow> windowT,
boolean hasReturnValue) {
return new AutoValue_DoFnSignature_ProcessElementMethod(
targetMethod,
Collections.unmodifiableList(extraParameters),
requiresStableInput,
trackerT,
windowT,
hasReturnValue);
}
/**
* Whether this {@link DoFn} observes - directly or indirectly - the window that an element
* resides in.
*
* <p>{@link State} and {@link Timer} parameters indirectly observe the window, because they are
* each scoped to a single window.
*/
public boolean observesWindow() {
return extraParameters().stream()
.anyMatch(
Predicates.or(
Predicates.instanceOf(WindowParameter.class),
Predicates.instanceOf(TimerParameter.class),
Predicates.instanceOf(StateParameter.class))
::apply);
}
@Nullable
public List<SchemaElementParameter> getSchemaElementParameters() {
return extraParameters().stream()
.filter(Predicates.instanceOf(SchemaElementParameter.class)::apply)
.map(SchemaElementParameter.class::cast)
.collect(Collectors.toList());
}
@Nullable
public List<SideInputParameter> getSideInputParameters() {
return extraParameters().stream()
.filter(Predicates.instanceOf(SideInputParameter.class)::apply)
.map(SideInputParameter.class::cast)
.collect(Collectors.toList());
}
/** The {@link OutputReceiverParameter} for a main output, or null if there is none. */
@Nullable
public OutputReceiverParameter getMainOutputReceiver() {
Optional<Parameter> parameter =
extraParameters().stream()
.filter(Predicates.instanceOf(OutputReceiverParameter.class)::apply)
.findFirst();
return parameter.isPresent() ? ((OutputReceiverParameter) parameter.get()) : null;
}
/**
* Whether this {@link DoFn} is <a href="https://s.apache.org/splittable-do-fn">splittable</a>.
*/
public boolean isSplittable() {
return extraParameters().stream()
.anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply);
}
}
/** Describes a {@link DoFn.OnTimer} method. */
@AutoValue
public abstract static class OnTimerMethod implements MethodWithExtraParameters {
/** The id on the method's {@link DoFn.TimerId} annotation. */
public abstract String id();
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
/**
* Whether this method requires stable input, expressed via {@link
* org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}. For timers, this means that any
* state must be stably persisted prior to calling it.
*/
public abstract boolean requiresStableInput();
/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();
static OnTimerMethod create(
Method targetMethod,
String id,
boolean requiresStableInput,
TypeDescriptor<? extends BoundedWindow> windowT,
List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_OnTimerMethod(
id,
targetMethod,
requiresStableInput,
windowT,
Collections.unmodifiableList(extraParameters));
}
}
/** Describes a {@link DoFn.OnTimerFamily} method. */
@AutoValue
public abstract static class OnTimerFamilyMethod implements MethodWithExtraParameters {
/** The id on the method's {@link DoFn.TimerId} annotation. */
public abstract String id();
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
/**
* Whether this method requires stable input, expressed via {@link
* org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}. For timers, this means that any
* state must be stably persisted prior to calling it.
*/
public abstract boolean requiresStableInput();
/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();
static OnTimerFamilyMethod create(
Method targetMethod,
String id,
boolean requiresStableInput,
TypeDescriptor<? extends BoundedWindow> windowT,
List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_OnTimerFamilyMethod(
id,
targetMethod,
requiresStableInput,
windowT,
Collections.unmodifiableList(extraParameters));
}
}
/** Describes a {@link DoFn.OnWindowExpiration} method. */
@AutoValue
public abstract static class OnWindowExpirationMethod implements MethodWithExtraParameters {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
/**
* Whether this method requires stable input, expressed via {@link
* org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}. For {@link
* org.apache.beam.sdk.transforms.DoFn.OnWindowExpiration}, this means that any state must be
* stably persisted prior to calling it.
*/
public abstract boolean requiresStableInput();
/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();
static OnWindowExpirationMethod create(
Method targetMethod,
boolean requiresStableInput,
TypeDescriptor<? extends BoundedWindow> windowT,
List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_OnWindowExpirationMethod(
targetMethod,
requiresStableInput,
windowT,
Collections.unmodifiableList(extraParameters));
}
}
/**
* Describes a timer declaration; a field of type {@link TimerSpec} annotated with {@link
* DoFn.TimerId}.
*/
@AutoValue
public abstract static class TimerDeclaration {
public abstract String id();
public abstract Field field();
static TimerDeclaration create(String id, Field field) {
return new AutoValue_DoFnSignature_TimerDeclaration(id, field);
}
}
/**
* Describes a timer family declaration; a field of type {@link TimerSpec} annotated with {@link
* DoFn.TimerFamily}.
*/
@AutoValue
public abstract static class TimerFamilyDeclaration {
public abstract String id();
public abstract Field field();
static TimerFamilyDeclaration create(String id, Field field) {
return new AutoValue_DoFnSignature_TimerFamilyDeclaration(id, field);
}
}
/** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
@AutoValue
public abstract static class BundleMethod implements DoFnMethod {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
static BundleMethod create(Method targetMethod) {
return new AutoValue_DoFnSignature_BundleMethod(targetMethod);
}
}
/**
* Describes a state declaration; a field of type {@link StateSpec} annotated with {@link
* DoFn.StateId}.
*/
@AutoValue
public abstract static class StateDeclaration {
public abstract String id();
public abstract Field field();
public abstract TypeDescriptor<? extends State> stateType();
static StateDeclaration create(
String id, Field field, TypeDescriptor<? extends State> stateType) {
field.setAccessible(true);
return new AutoValue_DoFnSignature_StateDeclaration(id, field, stateType);
}
}
/**
* Decscribes a field access declaration. This is used when the input {@link PCollection} has an
* associated schema, to specify exactly which fields in the row are accessed. Any fields not
* specified are not guaranteed to be present when reading the row.
*/
@AutoValue
public abstract static class FieldAccessDeclaration {
public abstract String id();
public abstract Field field();
static FieldAccessDeclaration create(String id, Field field) {
field.setAccessible(true);
return new AutoValue_DoFnSignature_FieldAccessDeclaration(id, field);
}
}
/** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */
@AutoValue
public abstract static class LifecycleMethod implements DoFnMethod {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
static LifecycleMethod create(Method targetMethod) {
return new AutoValue_DoFnSignature_LifecycleMethod(targetMethod);
}
}
/** Describes a {@link DoFn.GetInitialRestriction} method. */
@AutoValue
public abstract static class GetInitialRestrictionMethod implements MethodWithExtraParameters {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
/** Type of the returned restriction. */
public abstract TypeDescriptor<?> restrictionT();
/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();
static GetInitialRestrictionMethod create(
Method targetMethod,
TypeDescriptor<?> restrictionT,
TypeDescriptor<? extends BoundedWindow> windowT,
List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_GetInitialRestrictionMethod(
targetMethod, restrictionT, windowT, extraParameters);
}
}
/** Describes a {@link DoFn.SplitRestriction} method. */
@AutoValue
public abstract static class SplitRestrictionMethod implements MethodWithExtraParameters {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
/** Type of the restriction taken and returned. */
public abstract TypeDescriptor<?> restrictionT();
/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();
static SplitRestrictionMethod create(
Method targetMethod,
TypeDescriptor<?> restrictionT,
TypeDescriptor<? extends BoundedWindow> windowT,
List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_SplitRestrictionMethod(
targetMethod, restrictionT, windowT, extraParameters);
}
}
/** Describes a {@link DoFn.NewTracker} method. */
@AutoValue
public abstract static class NewTrackerMethod implements MethodWithExtraParameters {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
/** Type of the input restriction. */
public abstract TypeDescriptor<?> restrictionT();
/** Type of the returned {@link RestrictionTracker}. */
public abstract TypeDescriptor<?> trackerT();
/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();
static NewTrackerMethod create(
Method targetMethod,
TypeDescriptor<?> restrictionT,
TypeDescriptor<?> trackerT,
TypeDescriptor<? extends BoundedWindow> windowT,
List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_NewTrackerMethod(
targetMethod, restrictionT, trackerT, windowT, extraParameters);
}
}
/** Describes a {@link DoFn.GetRestrictionCoder} method. */
@AutoValue
public abstract static class GetRestrictionCoderMethod implements DoFnMethod {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
/** Type of the returned {@link Coder}. */
public abstract TypeDescriptor<?> coderT();
static GetRestrictionCoderMethod create(Method targetMethod, TypeDescriptor<?> coderT) {
return new AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT);
}
}
}