| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.core; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import javax.annotation.Nullable; |
| import org.apache.beam.runners.core.DoFnRunners.OutputManager; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.schemas.SchemaCoder; |
| import org.apache.beam.sdk.state.State; |
| import org.apache.beam.sdk.state.StateSpec; |
| import org.apache.beam.sdk.state.TimeDomain; |
| import org.apache.beam.sdk.state.Timer; |
| import org.apache.beam.sdk.state.TimerMap; |
| import org.apache.beam.sdk.state.TimerSpec; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; |
| import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; |
| import org.apache.beam.sdk.transforms.DoFnOutputReceivers; |
| import org.apache.beam.sdk.transforms.DoFnSchemaInformation; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; |
| import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; |
| import org.apache.beam.sdk.transforms.reflect.DoFnSignature; |
| import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; |
| import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.util.SystemDoFnInternal; |
| import org.apache.beam.sdk.util.UserCodeException; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.Row; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.WindowingStrategy; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.joda.time.format.PeriodFormat; |
| |
| /** |
| * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in. |
| * |
| * <p>Also, if the {@link DoFn} observes the window of the element, then {@link SimpleDoFnRunner} |
| * explodes windows of the input {@link WindowedValue} and calls {@link DoFn.ProcessElement} for |
| * each window individually. |
| * |
| * @param <InputT> the type of the {@link DoFn} (main) input elements |
| * @param <OutputT> the type of the {@link DoFn} (main) output elements |
| */ |
| public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { |
| |
| private final PipelineOptions options; |
| /** The {@link DoFn} being run. */ |
| private final DoFn<InputT, OutputT> fn; |
| |
| /** The {@link DoFnInvoker} being run. */ |
| private final DoFnInvoker<InputT, OutputT> invoker; |
| |
| private final SideInputReader sideInputReader; |
| private final OutputManager outputManager; |
| |
| private final TupleTag<OutputT> mainOutputTag; |
| /** The set of known output tags. */ |
| private final Set<TupleTag<?>> outputTags; |
| |
| private final boolean observesWindow; |
| |
| private final DoFnSignature signature; |
| |
| private final Coder<BoundedWindow> windowCoder; |
| |
| private final Duration allowedLateness; |
| |
| // Because of setKey(Object), we really must refresh stateInternals() at each access |
| private final StepContext stepContext; |
| |
| @Nullable private final SchemaCoder<InputT> schemaCoder; |
| |
| @Nullable final SchemaCoder<OutputT> mainOutputSchemaCoder; |
| |
| @Nullable private Map<TupleTag<?>, Coder<?>> outputCoders; |
| |
| @Nullable private final DoFnSchemaInformation doFnSchemaInformation; |
| |
| private final Map<String, PCollectionView<?>> sideInputMapping; |
| |
| /** Constructor. */ |
| public SimpleDoFnRunner( |
| PipelineOptions options, |
| DoFn<InputT, OutputT> fn, |
| SideInputReader sideInputReader, |
| OutputManager outputManager, |
| TupleTag<OutputT> mainOutputTag, |
| List<TupleTag<?>> additionalOutputTags, |
| StepContext stepContext, |
| @Nullable Coder<InputT> inputCoder, |
| Map<TupleTag<?>, Coder<?>> outputCoders, |
| WindowingStrategy<?, ?> windowingStrategy, |
| DoFnSchemaInformation doFnSchemaInformation, |
| Map<String, PCollectionView<?>> sideInputMapping) { |
| this.options = options; |
| this.fn = fn; |
| this.signature = DoFnSignatures.getSignature(fn.getClass()); |
| this.observesWindow = signature.processElement().observesWindow() || !sideInputReader.isEmpty(); |
| this.invoker = DoFnInvokers.invokerFor(fn); |
| this.sideInputReader = sideInputReader; |
| this.schemaCoder = |
| (inputCoder != null && inputCoder instanceof SchemaCoder) |
| ? (SchemaCoder<InputT>) inputCoder |
| : null; |
| this.outputCoders = outputCoders; |
| if (outputCoders != null && !outputCoders.isEmpty()) { |
| Coder<OutputT> outputCoder = (Coder<OutputT>) outputCoders.get(mainOutputTag); |
| mainOutputSchemaCoder = |
| (outputCoder instanceof SchemaCoder) ? (SchemaCoder<OutputT>) outputCoder : null; |
| } else { |
| mainOutputSchemaCoder = null; |
| } |
| this.outputManager = outputManager; |
| this.mainOutputTag = mainOutputTag; |
| this.outputTags = |
| Sets.newHashSet(FluentIterable.<TupleTag<?>>of(mainOutputTag).append(additionalOutputTags)); |
| this.stepContext = stepContext; |
| |
| // This is a cast of an _invariant_ coder. But we are assured by pipeline validation |
| // that it really is the coder for whatever BoundedWindow subclass is provided |
| @SuppressWarnings("unchecked") |
| Coder<BoundedWindow> untypedCoder = |
| (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(); |
| this.windowCoder = untypedCoder; |
| this.allowedLateness = windowingStrategy.getAllowedLateness(); |
| this.doFnSchemaInformation = doFnSchemaInformation; |
| this.sideInputMapping = sideInputMapping; |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT> getFn() { |
| return fn; |
| } |
| |
| @Override |
| public void startBundle() { |
| // This can contain user code. Wrap it in case it throws an exception. |
| try { |
| invoker.invokeStartBundle(new DoFnStartBundleContext()); |
| } catch (Throwable t) { |
| // Exception in user code. |
| throw wrapUserCodeException(t); |
| } |
| } |
| |
| @Override |
| public void processElement(WindowedValue<InputT> compressedElem) { |
| if (observesWindow) { |
| for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) { |
| invokeProcessElement(elem); |
| } |
| } else { |
| invokeProcessElement(compressedElem); |
| } |
| } |
| |
| @Override |
| public void onTimer( |
| String timerId, |
| String timerFamilyId, |
| BoundedWindow window, |
| Instant timestamp, |
| Instant outputTimestamp, |
| TimeDomain timeDomain) { |
| |
| // The effective timestamp is when derived elements will have their timestamp set, if not |
| // otherwise specified. If this is an event time timer, then they have the timer's output |
| // timestamp. Otherwise, they are set to the input timestamp, which is by definition |
| // non-late. |
| Instant effectiveTimestamp; |
| switch (timeDomain) { |
| case EVENT_TIME: |
| effectiveTimestamp = outputTimestamp; |
| break; |
| case PROCESSING_TIME: |
| case SYNCHRONIZED_PROCESSING_TIME: |
| effectiveTimestamp = stepContext.timerInternals().currentInputWatermarkTime(); |
| break; |
| |
| default: |
| throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain)); |
| } |
| |
| OnTimerArgumentProvider argumentProvider = |
| new OnTimerArgumentProvider(timerId, window, effectiveTimestamp, timeDomain); |
| invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider); |
| } |
| |
| private void invokeProcessElement(WindowedValue<InputT> elem) { |
| // This can contain user code. Wrap it in case it throws an exception. |
| try { |
| invoker.invokeProcessElement(new DoFnProcessContext(elem)); |
| } catch (Exception ex) { |
| throw wrapUserCodeException(ex); |
| } |
| } |
| |
| @Override |
| public void finishBundle() { |
| // This can contain user code. Wrap it in case it throws an exception. |
| try { |
| invoker.invokeFinishBundle(new DoFnFinishBundleContext()); |
| } catch (Throwable t) { |
| // Exception in user code. |
| throw wrapUserCodeException(t); |
| } |
| } |
| |
| private RuntimeException wrapUserCodeException(Throwable t) { |
| throw UserCodeException.wrapIf(!isSystemDoFn(), t); |
| } |
| |
| private boolean isSystemDoFn() { |
| return invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class); |
| } |
| |
| private <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { |
| if (!sideInputReader.contains(view)) { |
| throw new IllegalArgumentException("calling sideInput() with unknown view"); |
| } |
| return sideInputReader.get(view, sideInputWindow); |
| } |
| |
| private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { |
| checkArgument(outputTags.contains(tag), "Unknown output tag %s", tag); |
| outputManager.output(tag, windowedElem); |
| } |
| |
| /** A concrete implementation of {@link DoFn.StartBundleContext}. */ |
| private class DoFnStartBundleContext extends DoFn<InputT, OutputT>.StartBundleContext |
| implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { |
| private DoFnStartBundleContext() { |
| fn.super(); |
| } |
| |
| @Override |
| public PipelineOptions getPipelineOptions() { |
| return options; |
| } |
| |
| @Override |
| public BoundedWindow window() { |
| throw new UnsupportedOperationException( |
| "Cannot access window outside of @ProcessElement and @OnTimer methods."); |
| } |
| |
| @Override |
| public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access paneInfo outside of @ProcessElement methods."); |
| } |
| |
| @Override |
| public PipelineOptions pipelineOptions() { |
| return getPipelineOptions(); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { |
| return this; |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( |
| DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access FinishBundleContext outside of @FinishBundle method."); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access ProcessContext outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public InputT element(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Element parameters are not supported outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public InputT sideInput(String tagId) { |
| throw new UnsupportedOperationException( |
| "SideInput parameters are not supported outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public Object schemaElement(int index) { |
| throw new UnsupportedOperationException( |
| "Element parameters are not supported outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public Instant timestamp(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access timestamp outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public String timerId(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException("Cannot access timerId outside of @OnTimer method."); |
| } |
| |
| @Override |
| public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access time domain outside of @ProcessTimer method."); |
| } |
| |
| @Override |
| public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access output receiver outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access output receiver outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access output receiver outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access OnTimerContext outside of @OnTimer methods."); |
| } |
| |
| @Override |
| public RestrictionTracker<?, ?> restrictionTracker() { |
| throw new UnsupportedOperationException( |
| "Cannot access RestrictionTracker outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public State state(String stateId) { |
| throw new UnsupportedOperationException( |
| "Cannot access state outside of @ProcessElement and @OnTimer methods."); |
| } |
| |
| @Override |
| public Timer timer(String timerId) { |
| throw new UnsupportedOperationException( |
| "Cannot access timers outside of @ProcessElement and @OnTimer methods."); |
| } |
| |
| @Override |
| public TimerMap timerFamily(String tagId) { |
| throw new UnsupportedOperationException( |
| "Cannot access timer family outside of @ProcessElement and @OnTimer methods"); |
| } |
| } |
| |
| /** B A concrete implementation of {@link DoFn.FinishBundleContext}. */ |
| private class DoFnFinishBundleContext extends DoFn<InputT, OutputT>.FinishBundleContext |
| implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { |
| private DoFnFinishBundleContext() { |
| fn.super(); |
| } |
| |
| @Override |
| public PipelineOptions getPipelineOptions() { |
| return options; |
| } |
| |
| @Override |
| public BoundedWindow window() { |
| throw new UnsupportedOperationException( |
| "Cannot access window outside of @ProcessElement and @OnTimer methods."); |
| } |
| |
| @Override |
| public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access paneInfo outside of @ProcessElement methods."); |
| } |
| |
| @Override |
| public PipelineOptions pipelineOptions() { |
| return getPipelineOptions(); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access StartBundleContext outside of @StartBundle method."); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( |
| DoFn<InputT, OutputT> doFn) { |
| return this; |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access ProcessContext outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public InputT element(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access element outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public InputT sideInput(String tagId) { |
| throw new UnsupportedOperationException( |
| "Cannot access sideInput outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public Object schemaElement(int index) { |
| throw new UnsupportedOperationException( |
| "Cannot access element outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public Instant timestamp(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access timestamp outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public String timerId(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access timerId as parameter outside of @OnTimer method."); |
| } |
| |
| @Override |
| public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access time domain outside of @ProcessTimer method."); |
| } |
| |
| @Override |
| public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access outputReceiver in @FinishBundle method."); |
| } |
| |
| @Override |
| public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access outputReceiver in @FinishBundle method."); |
| } |
| |
| @Override |
| public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access outputReceiver in @FinishBundle method."); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access OnTimerContext outside of @OnTimer methods."); |
| } |
| |
| @Override |
| public RestrictionTracker<?, ?> restrictionTracker() { |
| throw new UnsupportedOperationException( |
| "Cannot access RestrictionTracker outside of @ProcessElement method."); |
| } |
| |
| @Override |
| public State state(String stateId) { |
| throw new UnsupportedOperationException( |
| "Cannot access state outside of @ProcessElement and @OnTimer methods."); |
| } |
| |
| @Override |
| public Timer timer(String timerId) { |
| throw new UnsupportedOperationException( |
| "Cannot access timers outside of @ProcessElement and @OnTimer methods."); |
| } |
| |
| @Override |
| public TimerMap timerFamily(String tagId) { |
| throw new UnsupportedOperationException( |
| "Cannot access timerFamily outside of @ProcessElement and @OnTimer methods."); |
| } |
| |
| @Override |
| public void output(OutputT output, Instant timestamp, BoundedWindow window) { |
| output(mainOutputTag, output, timestamp, window); |
| } |
| |
| @Override |
| public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { |
| outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); |
| } |
| } |
| |
| /** |
| * A concrete implementation of {@link DoFn.ProcessContext} used for running a {@link DoFn} over a |
| * single element. |
| */ |
| private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext |
| implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { |
| final WindowedValue<InputT> elem; |
| /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ |
| @Nullable private StateNamespace namespace; |
| |
| /** |
| * The state namespace for this context. |
| * |
| * <p>Any call to this method when more than one window is present will crash; this represents a |
| * bug in the runner or the {@link DoFnSignature}, since values must be in exactly one window |
| * when state or timers are relevant. |
| */ |
| private StateNamespace getNamespace() { |
| if (namespace == null) { |
| namespace = StateNamespaces.window(windowCoder, window()); |
| } |
| return namespace; |
| } |
| |
| private DoFnProcessContext(WindowedValue<InputT> elem) { |
| fn.super(); |
| this.elem = elem; |
| } |
| |
| @Override |
| public PipelineOptions getPipelineOptions() { |
| return options; |
| } |
| |
| @Override |
| public InputT element() { |
| return elem.getValue(); |
| } |
| |
| @Override |
| public <T> T sideInput(PCollectionView<T> view) { |
| checkNotNull(view, "View passed to sideInput cannot be null"); |
| BoundedWindow window = Iterables.getOnlyElement(windows()); |
| return SimpleDoFnRunner.this.sideInput( |
| view, view.getWindowMappingFn().getSideInputWindow(window)); |
| } |
| |
| @Override |
| public PaneInfo pane() { |
| return elem.getPane(); |
| } |
| |
| @Override |
| public void updateWatermark(Instant watermark) { |
| throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()"); |
| } |
| |
| @Override |
| public void output(OutputT output) { |
| output(mainOutputTag, output); |
| } |
| |
| @Override |
| public void outputWithTimestamp(OutputT output, Instant timestamp) { |
| checkTimestamp(timestamp); |
| outputWithTimestamp(mainOutputTag, output, timestamp); |
| } |
| |
| @Override |
| public <T> void output(TupleTag<T> tag, T output) { |
| checkNotNull(tag, "Tag passed to output cannot be null"); |
| outputWindowedValue(tag, elem.withValue(output)); |
| } |
| |
| @Override |
| public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { |
| checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); |
| checkTimestamp(timestamp); |
| outputWindowedValue( |
| tag, WindowedValue.of(output, timestamp, elem.getWindows(), elem.getPane())); |
| } |
| |
| @Override |
| public Instant timestamp() { |
| return elem.getTimestamp(); |
| } |
| |
| public Collection<? extends BoundedWindow> windows() { |
| return elem.getWindows(); |
| } |
| |
| @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected |
| private void checkTimestamp(Instant timestamp) { |
| // The documentation of getAllowedTimestampSkew explicitly permits Long.MAX_VALUE to be used |
| // for infinite skew. Defend against underflow in that case for timestamps before the epoch |
| if (fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE |
| && timestamp.isBefore(elem.getTimestamp().minus(fn.getAllowedTimestampSkew()))) { |
| throw new IllegalArgumentException( |
| String.format( |
| "Cannot output with timestamp %s. Output timestamps must be no earlier than the " |
| + "timestamp of the current input (%s) minus the allowed skew (%s). See the " |
| + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed " |
| + "skew.", |
| timestamp, |
| elem.getTimestamp(), |
| PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()))); |
| } |
| } |
| |
| @Override |
| public BoundedWindow window() { |
| return Iterables.getOnlyElement(elem.getWindows()); |
| } |
| |
| @Override |
| public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) { |
| return pane(); |
| } |
| |
| @Override |
| public PipelineOptions pipelineOptions() { |
| return getPipelineOptions(); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException("StartBundleContext parameters are not supported."); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( |
| DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException("FinishBundleContext parameters are not supported."); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { |
| return this; |
| } |
| |
| @Override |
| public InputT element(DoFn<InputT, OutputT> doFn) { |
| return element(); |
| } |
| |
| @Override |
| public Object sideInput(String tagId) { |
| return sideInput(sideInputMapping.get(tagId)); |
| } |
| |
| @Override |
| public Object schemaElement(int index) { |
| SerializableFunction converter = doFnSchemaInformation.getElementConverters().get(index); |
| return converter.apply(element()); |
| } |
| |
| @Override |
| public Instant timestamp(DoFn<InputT, OutputT> doFn) { |
| return timestamp(); |
| } |
| |
| @Override |
| public String timerId(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access timerId as parameter outside of @OnTimer method."); |
| } |
| |
| @Override |
| public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access time domain outside of @ProcessTimer method."); |
| } |
| |
| @Override |
| public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { |
| return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag); |
| } |
| |
| @Override |
| public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { |
| return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder); |
| } |
| |
| @Override |
| public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { |
| return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access OnTimerContext outside of @OnTimer methods."); |
| } |
| |
| @Override |
| public RestrictionTracker<?, ?> restrictionTracker() { |
| throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); |
| } |
| |
| @Override |
| public State state(String stateId) { |
| try { |
| StateSpec<?> spec = |
| (StateSpec<?>) signature.stateDeclarations().get(stateId).field().get(fn); |
| return stepContext |
| .stateInternals() |
| .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec)); |
| } catch (IllegalAccessException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public Timer timer(String timerId) { |
| try { |
| TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); |
| return new TimerInternalsTimer( |
| window(), getNamespace(), timerId, spec, stepContext.timerInternals()); |
| } catch (IllegalAccessException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public TimerMap timerFamily(String timerFamilyId) { |
| try { |
| TimerSpec spec = |
| (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn); |
| return new TimerInternalsTimerMap( |
| timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals()); |
| } catch (IllegalAccessException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| /** |
| * A concrete implementation of {@link DoFnInvoker.ArgumentProvider} used for running a {@link |
| * DoFn} on a timer. |
| */ |
| private class OnTimerArgumentProvider extends DoFn<InputT, OutputT>.OnTimerContext |
| implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { |
| private final BoundedWindow window; |
| private final Instant timestamp; |
| private final TimeDomain timeDomain; |
| private final String timerId; |
| |
| /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ |
| private @Nullable StateNamespace namespace; |
| |
| /** |
| * The state namespace for this context. |
| * |
| * <p>Any call to this method when more than one window is present will crash; this represents a |
| * bug in the runner or the {@link DoFnSignature}, since values must be in exactly one window |
| * when state or timers are relevant. |
| */ |
| private StateNamespace getNamespace() { |
| if (namespace == null) { |
| namespace = StateNamespaces.window(windowCoder, window); |
| } |
| return namespace; |
| } |
| |
| private OnTimerArgumentProvider( |
| String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { |
| fn.super(); |
| this.timerId = timerId; |
| this.window = window; |
| this.timestamp = timestamp; |
| this.timeDomain = timeDomain; |
| } |
| |
| @Override |
| public Instant timestamp() { |
| return timestamp; |
| } |
| |
| @Override |
| public BoundedWindow window() { |
| return window; |
| } |
| |
| @Override |
| public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException( |
| "Cannot access paneInfo outside of @ProcessElement methods."); |
| } |
| |
| @Override |
| public PipelineOptions pipelineOptions() { |
| return getPipelineOptions(); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException("StartBundleContext parameters are not supported."); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( |
| DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException("FinishBundleContext parameters are not supported."); |
| } |
| |
| @Override |
| public TimeDomain timeDomain() { |
| return timeDomain; |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException("ProcessContext parameters are not supported."); |
| } |
| |
| @Override |
| public InputT element(DoFn<InputT, OutputT> doFn) { |
| throw new UnsupportedOperationException("Element parameters are not supported."); |
| } |
| |
| @Override |
| public Object sideInput(String tagId) { |
| throw new UnsupportedOperationException("SideInput parameters are not supported."); |
| } |
| |
| @Override |
| public Object schemaElement(int index) { |
| throw new UnsupportedOperationException("Element parameters are not supported."); |
| } |
| |
| @Override |
| public Instant timestamp(DoFn<InputT, OutputT> doFn) { |
| return timestamp(); |
| } |
| |
| @Override |
| public String timerId(DoFn<InputT, OutputT> doFn) { |
| return timerId; |
| } |
| |
| @Override |
| public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) { |
| return timeDomain(); |
| } |
| |
| @Override |
| public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { |
| return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag); |
| } |
| |
| @Override |
| public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { |
| return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder); |
| } |
| |
| @Override |
| public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { |
| return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders); |
| } |
| |
| @Override |
| public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { |
| return this; |
| } |
| |
| @Override |
| public RestrictionTracker<?, ?> restrictionTracker() { |
| throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); |
| } |
| |
| @Override |
| public State state(String stateId) { |
| try { |
| StateSpec<?> spec = |
| (StateSpec<?>) signature.stateDeclarations().get(stateId).field().get(fn); |
| return stepContext |
| .stateInternals() |
| .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec)); |
| } catch (IllegalAccessException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public Timer timer(String timerId) { |
| try { |
| TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); |
| return new TimerInternalsTimer( |
| window, getNamespace(), timerId, spec, stepContext.timerInternals()); |
| } catch (IllegalAccessException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public TimerMap timerFamily(String timerFamilyId) { |
| try { |
| TimerSpec spec = |
| (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn); |
| return new TimerInternalsTimerMap( |
| timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals()); |
| } catch (IllegalAccessException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public PipelineOptions getPipelineOptions() { |
| return options; |
| } |
| |
| @Override |
| public void output(OutputT output) { |
| output(mainOutputTag, output); |
| } |
| |
| @Override |
| public void outputWithTimestamp(OutputT output, Instant timestamp) { |
| outputWithTimestamp(mainOutputTag, output, timestamp); |
| } |
| |
| @Override |
| public <T> void output(TupleTag<T> tag, T output) { |
| outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); |
| } |
| |
| @Override |
| public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { |
| outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); |
| } |
| } |
| |
| private class TimerInternalsTimer implements Timer { |
| private final TimerInternals timerInternals; |
| |
| // The window and the namespace represent the same thing, but the namespace is a cached |
| // and specially encoded form. Since the namespace can be cached across timers, it is |
| // passed in whole rather than being computed here. |
| private final BoundedWindow window; |
| private final StateNamespace namespace; |
| private final String timerId; |
| private final String timerFamilyId; |
| private final TimerSpec spec; |
| private Instant target; |
| private Instant outputTimestamp; |
| private Duration period = Duration.ZERO; |
| private Duration offset = Duration.ZERO; |
| |
| public TimerInternalsTimer( |
| BoundedWindow window, |
| StateNamespace namespace, |
| String timerId, |
| TimerSpec spec, |
| TimerInternals timerInternals) { |
| this.window = window; |
| this.namespace = namespace; |
| this.timerId = timerId; |
| this.timerFamilyId = ""; |
| this.spec = spec; |
| this.timerInternals = timerInternals; |
| } |
| |
| public TimerInternalsTimer( |
| BoundedWindow window, |
| StateNamespace namespace, |
| String timerId, |
| String timerFamilyId, |
| TimerSpec spec, |
| TimerInternals timerInternals) { |
| this.window = window; |
| this.namespace = namespace; |
| this.timerId = timerId; |
| this.timerFamilyId = timerFamilyId; |
| this.spec = spec; |
| this.timerInternals = timerInternals; |
| } |
| |
| @Override |
| public void set(Instant target) { |
| this.target = target; |
| verifyAbsoluteTimeDomain(); |
| setAndVerifyOutputTimestamp(); |
| setUnderlyingTimer(); |
| } |
| |
| @Override |
| public void setRelative() { |
| Instant now = getCurrentTime(); |
| if (period.equals(Duration.ZERO)) { |
| target = now.plus(offset); |
| } else { |
| long millisSinceStart = now.plus(offset).getMillis() % period.getMillis(); |
| target = millisSinceStart == 0 ? now : now.plus(period).minus(millisSinceStart); |
| } |
| target = minTargetAndGcTime(target); |
| |
| setAndVerifyOutputTimestamp(); |
| setUnderlyingTimer(); |
| } |
| |
| @Override |
| public Timer offset(Duration offset) { |
| this.offset = offset; |
| return this; |
| } |
| |
| @Override |
| public Timer align(Duration period) { |
| this.period = period; |
| return this; |
| } |
| |
| /** |
| * For event time timers the target time should be prior to window GC time. So it return |
| * min(time to set, GC Time of window). |
| */ |
| private Instant minTargetAndGcTime(Instant target) { |
| if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { |
| Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness); |
| if (target.isAfter(windowExpiry)) { |
| return windowExpiry; |
| } |
| } |
| return target; |
| } |
| |
| @Override |
| public Timer withOutputTimestamp(Instant outputTimestamp) { |
| this.outputTimestamp = outputTimestamp; |
| return this; |
| } |
| |
| /** Verifies that the time domain of this timer is acceptable for absolute timers. */ |
| private void verifyAbsoluteTimeDomain() { |
| if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { |
| throw new IllegalStateException( |
| "Cannot only set relative timers in processing time domain." + " Use #setRelative()"); |
| } |
| } |
| |
| /** |
| * |
| * |
| * <ul> |
| * Ensures that: |
| * <li>Users can't set {@code outputTimestamp} for processing time timers. |
| * <li>Event time timers' {@code outputTimestamp} is set before window expiration. |
| * </ul> |
| */ |
| private void setAndVerifyOutputTimestamp() { |
| // Output timestamp is currently not supported in processing time timers. |
| if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { |
| throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); |
| } |
| // Output timestamp is set to the delivery time if not initialized by an user. |
| if (outputTimestamp == null) { |
| outputTimestamp = target; |
| } |
| |
| if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { |
| Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); |
| checkArgument( |
| !target.isAfter(windowExpiry), |
| "Attempted to set event time timer that outputs for %s but that is" |
| + " after the expiration of window %s", |
| target, |
| windowExpiry); |
| } |
| } |
| |
| /** |
| * Sets the timer for the target time without checking anything about whether it is a reasonable |
| * thing to do. For example, absolute processing time timers are not really sensible since the |
| * user has no way to compute a good choice of time. |
| */ |
| private void setUnderlyingTimer() { |
| timerInternals.setTimer( |
| namespace, timerId, timerFamilyId, target, outputTimestamp, spec.getTimeDomain()); |
| } |
| |
| private Instant getCurrentTime() { |
| switch (spec.getTimeDomain()) { |
| case EVENT_TIME: |
| return timerInternals.currentInputWatermarkTime(); |
| case PROCESSING_TIME: |
| return timerInternals.currentProcessingTime(); |
| case SYNCHRONIZED_PROCESSING_TIME: |
| return timerInternals.currentSynchronizedProcessingTime(); |
| default: |
| throw new IllegalStateException( |
| String.format("Timer created for unknown time domain %s", spec.getTimeDomain())); |
| } |
| } |
| } |
| |
| private class TimerInternalsTimerMap implements TimerMap { |
| |
| Map<String, Timer> timers = new HashMap<>(); |
| private final TimerInternals timerInternals; |
| private final BoundedWindow window; |
| private final StateNamespace namespace; |
| private final TimerSpec spec; |
| private final String timerFamilyId; |
| |
| public TimerInternalsTimerMap( |
| String timerFamilyId, |
| BoundedWindow window, |
| StateNamespace namespace, |
| TimerSpec spec, |
| TimerInternals timerInternals) { |
| this.window = window; |
| this.namespace = namespace; |
| this.spec = spec; |
| this.timerInternals = timerInternals; |
| this.timerFamilyId = timerFamilyId; |
| } |
| |
| @Override |
| public void set(String timerId, Instant absoluteTime) { |
| Timer timer = |
| new TimerInternalsTimer(window, namespace, timerId, timerFamilyId, spec, timerInternals); |
| timer.set(absoluteTime); |
| timers.put(timerId, timer); |
| } |
| |
| @Override |
| public Timer get(String timerId) { |
| if (timers.get(timerId) == null) { |
| Timer timer = |
| new TimerInternalsTimer( |
| window, namespace, timerId, timerFamilyId, spec, timerInternals); |
| timers.put(timerId, timer); |
| } |
| return timers.get(timerId); |
| } |
| } |
| } |