blob: f5a559ce74abb998caea2a7b88b027ca69a81f8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Context;
import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
/**
* Runs a {@link DoFn} by constructing the appropriate contexts and passing them in.
*
* <p>Also, if the {@link DoFn} observes the window of the element, then {@link SimpleDoFnRunner}
* explodes windows of the input {@link WindowedValue} and calls {@link DoFn.ProcessElement} for
* each window individually.
*
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
/** The {@link DoFn} being run. */
private final DoFn<InputT, OutputT> fn;
/** The {@link DoFnInvoker} being run. */
private final DoFnInvoker<InputT, OutputT> invoker;
/** The context used for running the {@link DoFn}. */
private final DoFnContext<InputT, OutputT> context;
private final OutputManager outputManager;
private final TupleTag<OutputT> mainOutputTag;
private final boolean observesWindow;
private final DoFnSignature signature;
private final Coder<BoundedWindow> windowCoder;
private final Duration allowedLateness;
// Because of setKey(Object), we really must refresh stateInternals() at each access
private final StepContext stepContext;
public SimpleDoFnRunner(
PipelineOptions options,
DoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
this.fn = fn;
this.signature = DoFnSignatures.getSignature(fn.getClass());
this.observesWindow = signature.processElement().observesWindow() || !sideInputReader.isEmpty();
this.invoker = DoFnInvokers.invokerFor(fn);
this.outputManager = outputManager;
this.mainOutputTag = mainOutputTag;
this.stepContext = stepContext;
// This is a cast of an _invariant_ coder. But we are assured by pipeline validation
// that it really is the coder for whatever BoundedWindow subclass is provided
@SuppressWarnings("unchecked")
Coder<BoundedWindow> untypedCoder =
(Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder();
this.windowCoder = untypedCoder;
this.allowedLateness = windowingStrategy.getAllowedLateness();
this.context =
new DoFnContext<>(
options,
fn,
sideInputReader,
outputManager,
mainOutputTag,
sideOutputTags,
stepContext,
aggregatorFactory,
windowingStrategy.getWindowFn());
}
@Override
public void startBundle() {
// This can contain user code. Wrap it in case it throws an exception.
try {
invoker.invokeStartBundle(context);
} catch (Throwable t) {
// Exception in user code.
throw wrapUserCodeException(t);
}
}
@Override
public void processElement(WindowedValue<InputT> compressedElem) {
if (observesWindow) {
for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
invokeProcessElement(elem);
}
} else {
invokeProcessElement(compressedElem);
}
}
@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
// The effective timestamp is when derived elements will have their timestamp set, if not
// otherwise specified. If this is an event time timer, then they have the timestamp of the
// timer itself. Otherwise, they are set to the input timestamp, which is by definition
// non-late.
Instant effectiveTimestamp;
switch (timeDomain) {
case EVENT_TIME:
effectiveTimestamp = timestamp;
break;
case PROCESSING_TIME:
case SYNCHRONIZED_PROCESSING_TIME:
effectiveTimestamp = context.stepContext.timerInternals().currentInputWatermarkTime();
break;
default:
throw new IllegalArgumentException(
String.format("Unknown time domain: %s", timeDomain));
}
OnTimerArgumentProvider<InputT, OutputT> argumentProvider =
new OnTimerArgumentProvider<>(
fn, context, window, allowedLateness, effectiveTimestamp, timeDomain);
invoker.invokeOnTimer(timerId, argumentProvider);
}
private void invokeProcessElement(WindowedValue<InputT> elem) {
final DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem);
// This can contain user code. Wrap it in case it throws an exception.
try {
invoker.invokeProcessElement(processContext);
} catch (Exception ex) {
throw wrapUserCodeException(ex);
}
}
@Override
public void finishBundle() {
// This can contain user code. Wrap it in case it throws an exception.
try {
invoker.invokeFinishBundle(context);
} catch (Throwable t) {
// Exception in user code.
throw wrapUserCodeException(t);
}
}
/** Returns a new {@link DoFn.ProcessContext} for the given element. */
private DoFnProcessContext<InputT, OutputT> createProcessContext(WindowedValue<InputT> elem) {
return new DoFnProcessContext<InputT, OutputT>(fn, context, elem, allowedLateness);
}
private RuntimeException wrapUserCodeException(Throwable t) {
throw UserCodeException.wrapIf(!isSystemDoFn(), t);
}
private boolean isSystemDoFn() {
return invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class);
}
/**
* A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
*
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
private static final int MAX_SIDE_OUTPUTS = 1000;
final PipelineOptions options;
final DoFn<InputT, OutputT> fn;
final SideInputReader sideInputReader;
final OutputManager outputManager;
final TupleTag<OutputT> mainOutputTag;
final StepContext stepContext;
final AggregatorFactory aggregatorFactory;
final WindowFn<?, ?> windowFn;
/**
* The set of known output tags, some of which may be undeclared, so we can throw an exception
* when it exceeds {@link #MAX_SIDE_OUTPUTS}.
*/
private Set<TupleTag<?>> outputTags;
public DoFnContext(
PipelineOptions options,
DoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowFn<?, ?> windowFn) {
fn.super();
this.options = options;
this.fn = fn;
this.sideInputReader = sideInputReader;
this.outputManager = outputManager;
this.mainOutputTag = mainOutputTag;
this.outputTags = Sets.newHashSet();
outputTags.add(mainOutputTag);
for (TupleTag<?> sideOutputTag : sideOutputTags) {
outputTags.add(sideOutputTag);
}
this.stepContext = stepContext;
this.aggregatorFactory = aggregatorFactory;
this.windowFn = windowFn;
super.setupDelegateAggregators();
}
//////////////////////////////////////////////////////////////////////////////
@Override
public PipelineOptions getPipelineOptions() {
return options;
}
<T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
final Instant inputTimestamp = timestamp;
if (timestamp == null) {
timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
}
if (windows == null) {
try {
// The windowFn can never succeed at accessing the element, so its type does not
// matter here
@SuppressWarnings("unchecked")
WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
windows =
objectWindowFn.assignWindows(
objectWindowFn.new AssignContext() {
@Override
public Object element() {
throw new UnsupportedOperationException(
"WindowFn attempted to access input element when none was available");
}
@Override
public Instant timestamp() {
if (inputTimestamp == null) {
throw new UnsupportedOperationException(
"WindowFn attempted to access input timestamp when none was available");
}
return inputTimestamp;
}
@Override
public W window() {
throw new UnsupportedOperationException(
"WindowFn attempted to access input windows when none were available");
}
});
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
}
return WindowedValue.of(output, timestamp, windows, pane);
}
public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
if (!sideInputReader.contains(view)) {
throw new IllegalArgumentException("calling sideInput() with unknown view");
}
return sideInputReader.get(view, sideInputWindow);
}
void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
}
void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
outputManager.output(mainOutputTag, windowedElem);
if (stepContext != null) {
stepContext.noteOutput(windowedElem);
}
}
private <T> void sideOutputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
}
private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
if (!outputTags.contains(tag)) {
// This tag wasn't declared nor was it seen before during this execution.
// Thus, this must be a new, undeclared and unconsumed output.
// To prevent likely user errors, enforce the limit on the number of side
// outputs.
if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
throw new IllegalArgumentException(
"the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
}
outputTags.add(tag);
}
outputManager.output(tag, windowedElem);
if (stepContext != null) {
stepContext.noteSideOutput(tag, windowedElem);
}
}
// Following implementations of output, outputWithTimestamp, and sideOutput
// are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
// ProcessContext's versions in DoFn.processElement.
@Override
public void output(OutputT output) {
outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
}
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
}
@Override
public <T> void sideOutput(TupleTag<T> tag, T output) {
checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
}
@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
}
@Override
protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
checkNotNull(combiner, "Combiner passed to createAggregator cannot be null");
return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
}
@Override
public BoundedWindow window() {
throw new UnsupportedOperationException(
"Cannot access window outside of @ProcessElement and @OnTimer methods.");
}
@Override
public Context context(DoFn<InputT, OutputT> doFn) {
return this;
}
@Override
public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access ProcessContext outside of @Processelement method.");
}
@Override
public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access OnTimerContext outside of @OnTimer methods.");
}
@Override
public RestrictionTracker<?> restrictionTracker() {
throw new UnsupportedOperationException(
"Cannot access RestrictionTracker outside of @ProcessElement method.");
}
@Override
public State state(String stateId) {
throw new UnsupportedOperationException(
"Cannot access state outside of @ProcessElement and @OnTimer methods.");
}
@Override
public Timer timer(String timerId) {
throw new UnsupportedOperationException(
"Cannot access timers outside of @ProcessElement and @OnTimer methods.");
}
}
/**
* A concrete implementation of {@link DoFn.ProcessContext} used for running a {@link DoFn} over a
* single element.
*
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
private class DoFnProcessContext<InputT, OutputT> extends DoFn<InputT, OutputT>.ProcessContext
implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
final DoFn<InputT, OutputT> fn;
final DoFnContext<InputT, OutputT> context;
final WindowedValue<InputT> windowedValue;
private final Duration allowedLateness;
/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
@Nullable private StateNamespace namespace;
/**
* The state namespace for this context.
*
* <p>Any call to {@link #getNamespace()} when more than one window is present will crash; this
* represents a bug in the runner or the {@link DoFnSignature}, since values must be in exactly
* one window when state or timers are relevant.
*/
private StateNamespace getNamespace() {
if (namespace == null) {
namespace = StateNamespaces.window(windowCoder, window());
}
return namespace;
}
private DoFnProcessContext(
DoFn<InputT, OutputT> fn,
DoFnContext<InputT, OutputT> context,
WindowedValue<InputT> windowedValue,
Duration allowedLateness) {
fn.super();
this.fn = fn;
this.context = context;
this.windowedValue = windowedValue;
this.allowedLateness = allowedLateness;
}
@Override
public PipelineOptions getPipelineOptions() {
return context.getPipelineOptions();
}
@Override
public InputT element() {
return windowedValue.getValue();
}
@Override
public <T> T sideInput(PCollectionView<T> view) {
checkNotNull(view, "View passed to sideInput cannot be null");
Iterator<? extends BoundedWindow> windowIter = windows().iterator();
BoundedWindow window;
if (!windowIter.hasNext()) {
if (context.windowFn instanceof GlobalWindows) {
// TODO: Remove this once GroupByKeyOnly no longer outputs elements
// without windows
window = GlobalWindow.INSTANCE;
} else {
throw new IllegalStateException(
"sideInput called when main input element is not in any windows");
}
} else {
window = windowIter.next();
if (windowIter.hasNext()) {
throw new IllegalStateException(
"sideInput called when main input element is in multiple windows");
}
}
return context.sideInput(
view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window));
}
@Override
public PaneInfo pane() {
return windowedValue.getPane();
}
@Override
public void output(OutputT output) {
context.outputWindowedValue(windowedValue.withValue(output));
}
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkTimestamp(timestamp);
context.outputWindowedValue(
output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
}
@Override
public <T> void sideOutput(TupleTag<T> tag, T output) {
checkNotNull(tag, "Tag passed to sideOutput cannot be null");
context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
}
@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
checkTimestamp(timestamp);
context.sideOutputWindowedValue(
tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
}
@Override
public Instant timestamp() {
return windowedValue.getTimestamp();
}
public Collection<? extends BoundedWindow> windows() {
return windowedValue.getWindows();
}
private void checkTimestamp(Instant timestamp) {
if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
throw new IllegalArgumentException(
String.format(
"Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+ "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed "
+ "skew.",
timestamp,
windowedValue.getTimestamp(),
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
}
}
@Override
protected <AggregatorInputT, AggregatorOutputT>
Aggregator<AggregatorInputT, AggregatorOutputT> createAggregator(
String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
return context.createAggregator(name, combiner);
}
@Override
public BoundedWindow window() {
return Iterables.getOnlyElement(windowedValue.getWindows());
}
@Override
public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) {
return this;
}
@Override
public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
return this;
}
@Override
public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access OnTimerContext outside of @OnTimer methods.");
}
@Override
public RestrictionTracker<?> restrictionTracker() {
throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
}
@Override
public State state(String stateId) {
try {
StateSpec<?, ?> spec =
(StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn);
return stepContext
.stateInternals()
.state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec));
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public Timer timer(String timerId) {
try {
TimerSpec spec =
(TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
return new TimerInternalsTimer(
window(), getNamespace(), allowedLateness, timerId, spec, stepContext.timerInternals());
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
/**
* A concrete implementation of {@link DoFnInvoker.ArgumentProvider} used for running a {@link
* DoFn} on a timer.
*
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
private class OnTimerArgumentProvider<InputT, OutputT>
extends DoFn<InputT, OutputT>.OnTimerContext
implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
final DoFn<InputT, OutputT> fn;
final DoFnContext<InputT, OutputT> context;
private final BoundedWindow window;
private final Instant timestamp;
private final TimeDomain timeDomain;
private final Duration allowedLateness;
/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
private StateNamespace namespace;
/**
* The state namespace for this context.
*
* <p>Any call to {@link #getNamespace()} when more than one window is present will crash; this
* represents a bug in the runner or the {@link DoFnSignature}, since values must be in exactly
* one window when state or timers are relevant.
*/
private StateNamespace getNamespace() {
if (namespace == null) {
namespace = StateNamespaces.window(windowCoder, window);
}
return namespace;
}
private OnTimerArgumentProvider(
DoFn<InputT, OutputT> fn,
DoFnContext<InputT, OutputT> context,
BoundedWindow window,
Duration allowedLateness,
Instant timestamp,
TimeDomain timeDomain) {
fn.super();
this.fn = fn;
this.context = context;
this.window = window;
this.allowedLateness = allowedLateness;
this.timestamp = timestamp;
this.timeDomain = timeDomain;
}
@Override
public Instant timestamp() {
return timestamp;
}
@Override
public BoundedWindow window() {
return window;
}
@Override
public TimeDomain timeDomain() {
return timeDomain;
}
@Override
public Context context(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("Context parameters are not supported.");
}
@Override
public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("ProcessContext parameters are not supported.");
}
@Override
public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
return this;
}
@Override
public RestrictionTracker<?> restrictionTracker() {
throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
}
@Override
public State state(String stateId) {
try {
StateSpec<?, ?> spec =
(StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn);
return stepContext
.stateInternals()
.state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec));
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public Timer timer(String timerId) {
try {
TimerSpec spec =
(TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn);
return new TimerInternalsTimer(
window, getNamespace(), allowedLateness, timerId, spec, stepContext.timerInternals());
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public PipelineOptions getPipelineOptions() {
return context.getPipelineOptions();
}
@Override
public void output(OutputT output) {
context.outputWindowedValue(
output, timestamp(), Collections.singleton(window()), PaneInfo.NO_FIRING);
}
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
context.outputWindowedValue(
output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}
@Override
public <T> void sideOutput(TupleTag<T> tag, T output) {
context.sideOutputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}
@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
context.sideOutputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}
@Override
protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
String name,
CombineFn<AggInputT, ?, AggOutputT> combiner) {
throw new UnsupportedOperationException("Cannot createAggregator in @OnTimer method");
}
}
private static class TimerInternalsTimer implements Timer {
private final TimerInternals timerInternals;
// The window and the namespace represent the same thing, but the namespace is a cached
// and specially encoded form. Since the namespace can be cached across timers, it is
// passed in whole rather than being computed here.
private final BoundedWindow window;
private final Duration allowedLateness;
private final StateNamespace namespace;
private final String timerId;
private final TimerSpec spec;
public TimerInternalsTimer(
BoundedWindow window,
StateNamespace namespace,
Duration allowedLateness,
String timerId,
TimerSpec spec,
TimerInternals timerInternals) {
this.window = window;
this.allowedLateness = allowedLateness;
this.namespace = namespace;
this.timerId = timerId;
this.spec = spec;
this.timerInternals = timerInternals;
}
@Override
public void set(Instant target) {
verifyAbsoluteTimeDomain();
verifyTargetTime(target);
setUnderlyingTimer(target);
}
@Override
public void setForNowPlus(Duration durationFromNow) {
Instant target = getCurrentTime().plus(durationFromNow);
verifyTargetTime(target);
setUnderlyingTimer(target);
}
/**
* Ensures that the target time is reasonable. For event time timers this means that the
* time should be prior to window GC time.
*/
private void verifyTargetTime(Instant target) {
if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
checkArgument(!target.isAfter(windowExpiry),
"Attempted to set event time timer for %s but that is after"
+ " the expiration of window %s", target, windowExpiry);
}
}
/** Verifies that the time domain of this timer is acceptable for absolute timers. */
private void verifyAbsoluteTimeDomain() {
if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
throw new IllegalStateException(
"Cannot only set relative timers in processing time domain."
+ " Use #setForNowPlus(Duration)");
}
}
/**
* Sets the timer for the target time without checking anything about whether it is
* a reasonable thing to do. For example, absolute processing time timers are not
* really sensible since the user has no way to compute a good choice of time.
*/
private void setUnderlyingTimer(Instant target) {
timerInternals.setTimer(namespace, timerId, target, spec.getTimeDomain());
}
@Override
public void cancel() {
timerInternals.deleteTimer(namespace, timerId);
}
private Instant getCurrentTime() {
switch(spec.getTimeDomain()) {
case EVENT_TIME:
return timerInternals.currentInputWatermarkTime();
case PROCESSING_TIME:
return timerInternals.currentProcessingTime();
case SYNCHRONIZED_PROCESSING_TIME:
return timerInternals.currentSynchronizedProcessingTime();
default:
throw new IllegalStateException(
String.format("Timer created for unknown time domain %s", spec.getTimeDomain()));
}
}
}
}