blob: 50402e472c8270cc50995a0070a2b7cec73a0dc8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.fn.harness;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.FnApiStateAccessor;
import org.apache.beam.fn.harness.state.FnApiTimerBundleTracker;
import org.apache.beam.fn.harness.state.FnApiTimerBundleTracker.Modifications;
import org.apache.beam.fn.harness.state.FnApiTimerBundleTracker.TimerInfo;
import org.apache.beam.fn.harness.state.SideInputSpec;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.construction.PCollectionViewTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
import org.apache.beam.sdk.fn.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.BaseArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.DelegatingArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerFamilyDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.TruncateResult;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.util.Durations;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
/**
* A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers of
* abstraction caused by StateInternals/TimerInternals since they model state and timer concepts
* differently.
*/
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
"keyfor"
})
public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT> {
/** A registrar which provides a factory to handle Java {@link DoFn}s. */
@AutoService(PTransformRunnerFactory.Registrar.class)
public static class Registrar implements PTransformRunnerFactory.Registrar {
@Override
public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
Factory factory = new Factory();
return ImmutableMap.<String, PTransformRunnerFactory>builder()
.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, factory)
.put(PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN, factory)
.put(PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN, factory)
.put(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN, factory)
.put(
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN, factory)
.build();
}
}
static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT>
implements PTransformRunnerFactory<
FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT>> {
@Override
public final FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT>
createRunnerForPTransform(Context context) {
FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT> runner =
new FnApiDoFnRunner<>(
context.getPipelineOptions(),
context.getShortIdMap(),
context.getBeamFnStateClient(),
context.getPTransformId(),
context.getPTransform(),
context.getProcessBundleInstructionIdSupplier(),
context.getCacheTokensSupplier(),
context.getBundleCacheSupplier(),
context.getProcessWideCache(),
context.getPCollections(),
context.getCoders(),
context.getWindowingStrategies(),
context::addStartBundleFunction,
context::addFinishBundleFunction,
context::addResetFunction,
context::addTearDownFunction,
context::getPCollectionConsumer,
context::addPCollectionConsumer,
context::addOutgoingTimersEndpoint,
context::addBundleProgressReporter,
context.getSplitListener(),
context.getBundleFinalizer());
for (Map.Entry<String, KV<TimeDomain, Coder<Timer<Object>>>> entry :
runner.timerFamilyInfos.entrySet()) {
String localName = entry.getKey();
TimeDomain timeDomain = entry.getValue().getKey();
Coder<Timer<Object>> coder = entry.getValue().getValue();
if (!localName.equals("")
&& localName.equals(runner.parDoPayload.getOnWindowExpirationTimerFamilySpec())) {
context.addIncomingTimerEndpoint(
localName, coder, timer -> runner.processOnWindowExpiration(timer));
} else {
context.addIncomingTimerEndpoint(
localName, coder, timer -> runner.processTimer(localName, timeDomain, timer));
}
}
return runner;
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////
private final PipelineOptions pipelineOptions;
private final String pTransformId;
private final PTransform pTransform;
private final RehydratedComponents rehydratedComponents;
private final DoFn<InputT, OutputT> doFn;
private final DoFnSignature doFnSignature;
private final TupleTag<OutputT> mainOutputTag;
private final Coder<?> inputCoder;
private final Coder<?> keyCoder;
private final SchemaCoder<OutputT> mainOutputSchemaCoder;
private final Coder<? extends BoundedWindow> windowCoder;
private final WindowingStrategy<InputT, ?> windowingStrategy;
private final Map<TupleTag<?>, SideInputSpec> tagToSideInputSpecMap;
private final Map<TupleTag<?>, Coder<?>> outputCoders;
private final Map<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfos;
private final ParDoPayload parDoPayload;
private final ListMultimap<String, FnDataReceiver<WindowedValue<?>>> localNameToConsumer;
private final BundleSplitListener splitListener;
private final BundleFinalizer bundleFinalizer;
private final Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers;
private final String mainInputId;
private final FnApiStateAccessor<?> stateAccessor;
private final Map<String, FnDataReceiver<?>> outboundTimerReceivers;
private final @Nullable FnApiTimerBundleTracker timerBundleTracker;
private final DoFnInvoker<InputT, OutputT> doFnInvoker;
private final StartBundleArgumentProvider startBundleArgumentProvider;
private final ProcessBundleContextBase processContext;
private final OnTimerContext<?> onTimerContext;
private final OnWindowExpirationContext<?> onWindowExpirationContext;
private final FinishBundleArgumentProvider finishBundleArgumentProvider;
private final Duration allowedLateness;
private final String workCompletedShortId;
private final String workRemainingShortId;
/**
* Used to guarantee a consistent view of this {@link FnApiDoFnRunner} while setting up for {@link
* DoFnInvoker#invokeProcessElement} since {@link #trySplitForElementAndRestriction} may access
* internal {@link FnApiDoFnRunner} state concurrently.
*/
private final Object splitLock = new Object();
private final DoFnSchemaInformation doFnSchemaInformation;
private final Map<String, PCollectionView<?>> sideInputMapping;
// The member variables below are only valid for the lifetime of certain methods.
/** Only valid during {@code processElement...} methods, null otherwise. */
private WindowedValue<InputT> currentElement;
private Object currentKey;
/**
* Only valud during {@link
* #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
* #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
*/
private List<BoundedWindow> currentWindows;
/**
* Only valud during {@link
* #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
* #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
*/
private int windowStopIndex;
/**
* Only valud during {@link
* #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
* #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
*/
private int windowCurrentIndex;
/**
* Only valid during {@link #processElementForPairWithRestriction}, {@link
* #processElementForSplitRestriction}, and {@link
* #processElementForWindowObservingSizedElementAndRestriction}, null otherwise.
*/
private RestrictionT currentRestriction;
/**
* Only valid during {@link #processElementForSplitRestriction}, and {@link
* #processElementForWindowObservingSizedElementAndRestriction}, null otherwise.
*/
private WatermarkEstimatorStateT currentWatermarkEstimatorState;
/**
* Only valid during {@link #processElementForWindowObservingSizedElementAndRestriction} and
* {@link #processElementForWindowObservingTruncateRestriction}.
*/
private Instant initialWatermark;
/**
* Only valid during {@link #processElementForWindowObservingSizedElementAndRestriction}, null
* otherwise.
*/
private WatermarkEstimators.WatermarkAndStateObserver<WatermarkEstimatorStateT>
currentWatermarkEstimator;
/**
* Only valid during {@code processElementForWindowObserving...} and {@link #processTimer}
* methods, null otherwise.
*/
private BoundedWindow currentWindow;
/**
* Only valid during {@link #processElementForWindowObservingSizedElementAndRestriction}, null
* otherwise.
*/
private RestrictionTracker<RestrictionT, PositionT> currentTracker;
/**
* Only valid during {@link #processTimer} and {@link #processOnWindowExpiration}, null otherwise.
*/
private Timer<?> currentTimer;
/** Only valid during {@link #processTimer}, null otherwise. */
private TimeDomain currentTimeDomain;
FnApiDoFnRunner(
PipelineOptions pipelineOptions,
ShortIdMap shortIds,
BeamFnStateClient beamFnStateClient,
String pTransformId,
PTransform pTransform,
Supplier<String> processBundleInstructionId,
Supplier<List<BeamFnApi.ProcessBundleRequest.CacheToken>> cacheTokens,
Supplier<Cache<?, ?>> bundleCache,
Cache<?, ?> processWideCache,
Map<String, PCollection> pCollections,
Map<String, RunnerApi.Coder> coders,
Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
Consumer<ThrowingRunnable> addStartFunction,
Consumer<ThrowingRunnable> addFinishFunction,
Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> addTearDownFunction,
Function<String, FnDataReceiver<WindowedValue<?>>> getPCollectionConsumer,
BiConsumer<String, FnDataReceiver> addPCollectionConsumer,
BiFunction<String, Coder<Timer<Object>>, FnDataReceiver<Timer<Object>>>
getOutgoingTimersConsumer,
Consumer<BundleProgressReporter> addBundleProgressReporter,
BundleSplitListener splitListener,
BundleFinalizer bundleFinalizer) {
this.pipelineOptions = pipelineOptions;
this.pTransformId = pTransformId;
this.pTransform = pTransform;
ImmutableMap.Builder<TupleTag<?>, SideInputSpec> tagToSideInputSpecMapBuilder =
ImmutableMap.builder();
try {
rehydratedComponents =
RehydratedComponents.forComponents(
RunnerApi.Components.newBuilder()
.putAllCoders(coders)
.putAllPcollections(pCollections)
.putAllWindowingStrategies(windowingStrategies)
.build())
.withPipeline(Pipeline.create());
parDoPayload = ParDoPayload.parseFrom(pTransform.getSpec().getPayload());
doFn = (DoFn) ParDoTranslation.getDoFn(parDoPayload);
doFnSignature = DoFnSignatures.signatureForDoFn(doFn);
switch (pTransform.getSpec().getUrn()) {
case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
case PTransformTranslation.PAR_DO_TRANSFORM_URN:
mainOutputTag = (TupleTag) ParDoTranslation.getMainOutputTag(parDoPayload);
break;
case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
mainOutputTag =
new TupleTag(Iterables.getOnlyElement(pTransform.getOutputsMap().keySet()));
break;
default:
throw new IllegalStateException(
String.format("Unknown urn: %s", pTransform.getSpec().getUrn()));
}
String mainInputTag =
Iterables.getOnlyElement(
Sets.difference(
pTransform.getInputsMap().keySet(), parDoPayload.getSideInputsMap().keySet()));
PCollection mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
Coder<?> maybeWindowedValueInputCoder = rehydratedComponents.getCoder(mainInput.getCoderId());
// TODO: Stop passing windowed value coders within PCollections.
if (maybeWindowedValueInputCoder instanceof WindowedValue.WindowedValueCoder) {
inputCoder = ((WindowedValueCoder) maybeWindowedValueInputCoder).getValueCoder();
} else {
inputCoder = maybeWindowedValueInputCoder;
}
if (inputCoder instanceof KvCoder) {
this.keyCoder = ((KvCoder) inputCoder).getKeyCoder();
} else {
this.keyCoder = null;
}
windowingStrategy =
(WindowingStrategy)
rehydratedComponents.getWindowingStrategy(mainInput.getWindowingStrategyId());
windowCoder = windowingStrategy.getWindowFn().windowCoder();
outputCoders = Maps.newHashMap();
for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) {
TupleTag<?> outputTag = new TupleTag<>(entry.getKey());
RunnerApi.PCollection outputPCollection = pCollections.get(entry.getValue());
Coder<?> outputCoder = rehydratedComponents.getCoder(outputPCollection.getCoderId());
if (outputCoder instanceof WindowedValueCoder) {
outputCoder = ((WindowedValueCoder) outputCoder).getValueCoder();
}
outputCoders.put(outputTag, outputCoder);
}
Coder<OutputT> outputCoder = (Coder<OutputT>) outputCoders.get(mainOutputTag);
mainOutputSchemaCoder =
(outputCoder instanceof SchemaCoder) ? (SchemaCoder<OutputT>) outputCoder : null;
// Build the map from tag id to side input specification
for (Map.Entry<String, RunnerApi.SideInput> entry :
parDoPayload.getSideInputsMap().entrySet()) {
String sideInputTag = entry.getKey();
RunnerApi.SideInput sideInput = entry.getValue();
PCollection sideInputPCollection =
pCollections.get(pTransform.getInputsOrThrow(sideInputTag));
WindowingStrategy sideInputWindowingStrategy =
rehydratedComponents.getWindowingStrategy(
sideInputPCollection.getWindowingStrategyId());
tagToSideInputSpecMapBuilder.put(
new TupleTag<>(entry.getKey()),
SideInputSpec.create(
sideInput.getAccessPattern().getUrn(),
rehydratedComponents.getCoder(sideInputPCollection.getCoderId()),
sideInputWindowingStrategy.getWindowFn().windowCoder(),
PCollectionViewTranslation.viewFnFromProto(entry.getValue().getViewFn()),
PCollectionViewTranslation.windowMappingFnFromProto(
entry.getValue().getWindowMappingFn())));
}
ImmutableMap.Builder<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfosBuilder =
ImmutableMap.builder();
// Extract out relevant TimerFamilySpec information in preparation for execution.
for (Map.Entry<String, TimerFamilySpec> entry :
parDoPayload.getTimerFamilySpecsMap().entrySet()) {
// The timer family spec map key is either from timerId of timer declaration or
// timerFamilyId from timer family declaration.
String timerIdOrTimerFamilyId = entry.getKey();
TimeDomain timeDomain = translateTimeDomain(entry.getValue().getTimeDomain());
Coder<Timer<Object>> timerCoder =
(Coder) rehydratedComponents.getCoder(entry.getValue().getTimerFamilyCoderId());
timerFamilyInfosBuilder.put(timerIdOrTimerFamilyId, KV.of(timeDomain, timerCoder));
}
timerFamilyInfos = timerFamilyInfosBuilder.build();
this.mainInputId = ParDoTranslation.getMainInputName(pTransform);
this.allowedLateness =
rehydratedComponents
.getPCollection(pTransform.getInputsOrThrow(mainInputId))
.getWindowingStrategy()
.getAllowedLateness();
} catch (IOException exn) {
throw new IllegalArgumentException("Malformed ParDoPayload", exn);
}
ImmutableListMultimap.Builder<String, FnDataReceiver<WindowedValue<?>>>
localNameToConsumerBuilder = ImmutableListMultimap.builder();
for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) {
localNameToConsumerBuilder.putAll(
entry.getKey(), getPCollectionConsumer.apply(entry.getValue()));
}
localNameToConsumer = localNameToConsumerBuilder.build();
tagToSideInputSpecMap = tagToSideInputSpecMapBuilder.build();
this.splitListener = splitListener;
this.bundleFinalizer = bundleFinalizer;
this.onTimerContext = new OnTimerContext();
this.onWindowExpirationContext = new OnWindowExpirationContext<>();
this.mainOutputConsumers =
(Collection<FnDataReceiver<WindowedValue<OutputT>>>)
(Collection) localNameToConsumer.get(mainOutputTag.getId());
this.doFnSchemaInformation = ParDoTranslation.getSchemaInformation(parDoPayload);
this.sideInputMapping = ParDoTranslation.getSideInputMapping(parDoPayload);
this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(doFn, pipelineOptions);
this.startBundleArgumentProvider = new StartBundleArgumentProvider();
// Register the appropriate handlers.
switch (pTransform.getSpec().getUrn()) {
case PTransformTranslation.PAR_DO_TRANSFORM_URN:
case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
addStartFunction.accept(this::startBundle);
break;
case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
// startBundle should not be invoked
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
// startBundle should not be invoked
case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
// startBundle should not be invoked
default:
// no-op
}
String mainInput;
try {
mainInput = ParDoTranslation.getMainInputName(pTransform);
} catch (IOException e) {
throw new RuntimeException(e);
}
final FnDataReceiver<WindowedValue> mainInputConsumer;
switch (pTransform.getSpec().getUrn()) {
case PTransformTranslation.PAR_DO_TRANSFORM_URN:
if (doFnSignature.processElement().observesWindow() || !sideInputMapping.isEmpty()) {
mainInputConsumer = this::processElementForWindowObservingParDo;
this.processContext = new WindowObservingProcessBundleContext();
} else {
mainInputConsumer = this::processElementForParDo;
this.processContext = new NonWindowObservingProcessBundleContext();
}
break;
case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
if (doFnSignature.getInitialRestriction().observesWindow()
|| (doFnSignature.getInitialWatermarkEstimatorState() != null
&& doFnSignature.getInitialWatermarkEstimatorState().observesWindow())
|| !sideInputMapping.isEmpty()) {
mainInputConsumer = this::processElementForWindowObservingPairWithRestriction;
this.processContext = new WindowObservingProcessBundleContext();
} else {
mainInputConsumer = this::processElementForPairWithRestriction;
this.processContext = new NonWindowObservingProcessBundleContext();
}
break;
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
if ((doFnSignature.splitRestriction() != null
&& doFnSignature.splitRestriction().observesWindow())
|| (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
|| (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
|| !sideInputMapping.isEmpty()) {
mainInputConsumer = this::processElementForWindowObservingSplitRestriction;
this.processContext =
new SizedRestrictionWindowObservingProcessBundleContext(
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
} else {
mainInputConsumer = this::processElementForSplitRestriction;
this.processContext =
new SizedRestrictionNonWindowObservingProcessBundleContext(
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
}
break;
case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
if ((doFnSignature.truncateRestriction() != null
&& doFnSignature.truncateRestriction().observesWindow())
|| (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
|| (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
|| !sideInputMapping.isEmpty()) {
// Only forward split/progress when the only consumer is splittable.
if (mainOutputConsumers.size() == 1
&& Iterables.getOnlyElement(mainOutputConsumers) instanceof HandlesSplits) {
mainInputConsumer =
new SplittableFnDataReceiver() {
private final HandlesSplits splitDelegate =
(HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers);
@Override
public void accept(WindowedValue input) throws Exception {
processElementForWindowObservingTruncateRestriction(input);
}
@Override
public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
return trySplitForWindowObservingTruncateRestriction(
fractionOfRemainder, splitDelegate);
}
@Override
public double getProgress() {
Progress progress =
FnApiDoFnRunner.this.getProgressFromWindowObservingTruncate(
splitDelegate.getProgress());
if (progress != null) {
double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining();
if (totalWork > 0) {
return progress.getWorkCompleted() / totalWork;
}
}
return 0;
}
};
} else {
mainInputConsumer = this::processElementForWindowObservingTruncateRestriction;
}
this.processContext =
new SizedRestrictionWindowObservingProcessBundleContext(
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN);
} else {
// Only forward split/progress when the only consumer is splittable.
if (mainOutputConsumers.size() == 1
&& Iterables.getOnlyElement(mainOutputConsumers) instanceof HandlesSplits) {
mainInputConsumer =
new SplittableFnDataReceiver() {
private final HandlesSplits splitDelegate =
(HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers);
@Override
public void accept(WindowedValue input) throws Exception {
processElementForTruncateRestriction(input);
}
@Override
public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
return splitDelegate.trySplit(fractionOfRemainder);
}
@Override
public double getProgress() {
return splitDelegate.getProgress();
}
};
} else {
mainInputConsumer = this::processElementForTruncateRestriction;
}
this.processContext =
new SizedRestrictionNonWindowObservingProcessBundleContext(
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN);
}
break;
case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
if (doFnSignature.processElement().observesWindow()
|| (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
|| (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
|| (doFnSignature.newWatermarkEstimator() != null
&& doFnSignature.newWatermarkEstimator().observesWindow())
|| !sideInputMapping.isEmpty()) {
mainInputConsumer =
new SplittableFnDataReceiver() {
@Override
public void accept(WindowedValue input) throws Exception {
processElementForWindowObservingSizedElementAndRestriction(input);
}
};
this.processContext = new WindowObservingProcessBundleContext();
} else {
mainInputConsumer =
new SplittableFnDataReceiver() {
@Override
public void accept(WindowedValue input) throws Exception {
// TODO(BEAM-10303): Create a variant which is optimized to not observe the
// windows.
processElementForWindowObservingSizedElementAndRestriction(input);
}
};
this.processContext = new WindowObservingProcessBundleContext();
}
break;
default:
throw new IllegalStateException("Unknown urn: " + pTransform.getSpec().getUrn());
}
addPCollectionConsumer.accept(pTransform.getInputsOrThrow(mainInput), mainInputConsumer);
this.finishBundleArgumentProvider = new FinishBundleArgumentProvider();
switch (pTransform.getSpec().getUrn()) {
case PTransformTranslation.PAR_DO_TRANSFORM_URN:
case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
addFinishFunction.accept(this::finishBundle);
break;
case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
// finishBundle should not be invoked
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
// finishBundle should not be invoked
case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
// finishBundle should not be invoked
default:
// no-op
}
addTearDownFunction.accept(this::tearDown);
workCompletedShortId =
shortIds.getOrCreateShortId(
new SimpleMonitoringInfoBuilder()
.setUrn(MonitoringInfoConstants.Urns.WORK_COMPLETED)
.setType(MonitoringInfoConstants.TypeUrns.PROGRESS_TYPE)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, pTransformId)
.build());
workRemainingShortId =
shortIds.getOrCreateShortId(
new SimpleMonitoringInfoBuilder()
.setUrn(MonitoringInfoConstants.Urns.WORK_REMAINING)
.setType(MonitoringInfoConstants.TypeUrns.PROGRESS_TYPE)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, pTransformId)
.build());
switch (pTransform.getSpec().getUrn()) {
case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
addBundleProgressReporter.accept(
new BundleProgressReporter() {
@Override
public void updateIntermediateMonitoringData(Map<String, ByteString> monitoringData) {
Progress progress = getProgress();
if (progress == null) {
return;
}
ByteString encodedWorkCompleted, encodedWorkRemaining;
try {
encodedWorkCompleted = encodeProgress(progress.getWorkCompleted());
encodedWorkRemaining = encodeProgress(progress.getWorkRemaining());
} catch (IOException e) {
throw new RuntimeException("Failed to encode progress", e);
}
monitoringData.put(workCompletedShortId, encodedWorkCompleted);
monitoringData.put(workRemainingShortId, encodedWorkRemaining);
}
@Override
public void updateFinalMonitoringData(Map<String, ByteString> monitoringData) {
// No elements will be inflight when the progress completes.
}
@Override
public void reset() {}
private ByteString encodeProgress(double value) throws IOException {
ByteStringOutputStream output = new ByteStringOutputStream();
IterableCoder.of(DoubleCoder.of()).encode(Arrays.asList(value), output);
return output.toByteString();
}
});
break;
default:
// no-op
}
this.stateAccessor =
new FnApiStateAccessor(
pipelineOptions,
pTransformId,
processBundleInstructionId,
cacheTokens,
bundleCache,
processWideCache,
tagToSideInputSpecMap,
beamFnStateClient,
keyCoder,
windowCoder,
this::getCurrentKey,
() -> currentWindow);
// Register as a consumer for each timer.
this.outboundTimerReceivers = new HashMap<>();
if (timerFamilyInfos.isEmpty()) {
this.timerBundleTracker = null;
} else {
this.timerBundleTracker =
new FnApiTimerBundleTracker(
keyCoder, windowCoder, this::getCurrentKey, () -> currentWindow);
addResetFunction.accept(timerBundleTracker::reset);
for (Map.Entry<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfo :
timerFamilyInfos.entrySet()) {
String localName = timerFamilyInfo.getKey();
Coder<Timer<Object>> timerCoder = timerFamilyInfo.getValue().getValue();
outboundTimerReceivers.put(
localName, getOutgoingTimersConsumer.apply(localName, timerCoder));
}
}
}
private Object getCurrentKey() {
if (currentKey != null) {
return currentKey;
}
// TODO: Maybe memoize the key?
if (currentElement != null) {
checkState(
currentElement.getValue() instanceof KV,
"Accessing state in unkeyed context. Current element is not a KV: %s.",
currentElement.getValue());
return ((KV) currentElement.getValue()).getKey();
} else if (currentTimer != null) {
return currentTimer.getUserKey();
}
return null;
}
private void startBundle() {
doFnInvoker.invokeStartBundle(startBundleArgumentProvider);
}
private void processElementForParDo(WindowedValue<InputT> elem) {
currentElement = elem;
try {
doFnInvoker.invokeProcessElement(processContext);
} finally {
currentElement = null;
}
}
private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
currentElement = elem;
try {
Iterator<BoundedWindow> windowIterator =
(Iterator<BoundedWindow>) elem.getWindows().iterator();
while (windowIterator.hasNext()) {
currentWindow = windowIterator.next();
doFnInvoker.invokeProcessElement(processContext);
}
} finally {
currentElement = null;
currentWindow = null;
}
}
private void processElementForPairWithRestriction(WindowedValue<InputT> elem) {
currentElement = elem;
try {
currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext);
outputTo(
mainOutputConsumers,
(WindowedValue)
elem.withValue(
KV.of(
elem.getValue(),
KV.of(
currentRestriction,
doFnInvoker.invokeGetInitialWatermarkEstimatorState(processContext)))));
} finally {
currentElement = null;
currentRestriction = null;
}
this.stateAccessor.finalizeState();
}
private void processElementForWindowObservingPairWithRestriction(WindowedValue<InputT> elem) {
currentElement = elem;
try {
Iterator<BoundedWindow> windowIterator =
(Iterator<BoundedWindow>) elem.getWindows().iterator();
while (windowIterator.hasNext()) {
currentWindow = windowIterator.next();
currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext);
outputTo(
mainOutputConsumers,
(WindowedValue)
WindowedValue.of(
KV.of(
elem.getValue(),
KV.of(
currentRestriction,
doFnInvoker.invokeGetInitialWatermarkEstimatorState(processContext))),
currentElement.getTimestamp(),
currentWindow,
currentElement.getPane()));
}
} finally {
currentElement = null;
currentWindow = null;
currentRestriction = null;
}
this.stateAccessor.finalizeState();
}
private void processElementForSplitRestriction(
WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
currentElement = elem.withValue(elem.getValue().getKey());
currentRestriction = elem.getValue().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
@Override
public void onClaimFailed(PositionT position) {}
});
try {
doFnInvoker.invokeSplitRestriction(processContext);
} finally {
currentElement = null;
currentRestriction = null;
currentWatermarkEstimatorState = null;
currentTracker = null;
}
this.stateAccessor.finalizeState();
}
private void processElementForWindowObservingSplitRestriction(
WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
currentElement = elem.withValue(elem.getValue().getKey());
currentRestriction = elem.getValue().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
try {
Iterator<BoundedWindow> windowIterator =
(Iterator<BoundedWindow>) elem.getWindows().iterator();
while (windowIterator.hasNext()) {
currentWindow = windowIterator.next();
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
@Override
public void onClaimFailed(PositionT position) {}
});
doFnInvoker.invokeSplitRestriction(processContext);
}
} finally {
currentElement = null;
currentRestriction = null;
currentWatermarkEstimatorState = null;
currentWindow = null;
currentTracker = null;
}
this.stateAccessor.finalizeState();
}
private void processElementForTruncateRestriction(
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
currentElement = elem.withValue(elem.getValue().getKey().getKey());
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
@Override
public void onClaimFailed(PositionT position) {}
});
try {
TruncateResult<OutputT> truncatedRestriction =
doFnInvoker.invokeTruncateRestriction(processContext);
if (truncatedRestriction != null) {
processContext.output(truncatedRestriction.getTruncatedRestriction());
}
} finally {
currentTracker = null;
currentElement = null;
currentRestriction = null;
currentWatermarkEstimatorState = null;
}
this.stateAccessor.finalizeState();
}
private void processElementForWindowObservingTruncateRestriction(
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
currentElement = elem.withValue(elem.getValue().getKey().getKey());
try {
windowCurrentIndex = -1;
windowStopIndex = currentElement.getWindows().size();
currentWindows = ImmutableList.copyOf(currentElement.getWindows());
while (true) {
synchronized (splitLock) {
windowCurrentIndex++;
if (windowCurrentIndex >= windowStopIndex) {
break;
}
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
currentWindow = currentWindows.get(windowCurrentIndex);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
@Override
public void onClaimFailed(PositionT position) {}
});
currentWatermarkEstimator =
WatermarkEstimators.threadSafe(
doFnInvoker.invokeNewWatermarkEstimator(processContext));
initialWatermark = currentWatermarkEstimator.getWatermarkAndState().getKey();
}
TruncateResult<OutputT> truncatedRestriction =
doFnInvoker.invokeTruncateRestriction(processContext);
if (truncatedRestriction != null) {
processContext.output(truncatedRestriction.getTruncatedRestriction());
}
}
} finally {
currentTracker = null;
currentElement = null;
currentRestriction = null;
currentWatermarkEstimatorState = null;
currentWatermarkEstimator = null;
currentWindow = null;
currentWindows = null;
initialWatermark = null;
}
this.stateAccessor.finalizeState();
}
/** Internal class to hold the primary and residual roots when converted to an input element. */
@AutoValue
@AutoValue.CopyAnnotations
abstract static class WindowedSplitResult {
public static WindowedSplitResult forRoots(
WindowedValue<?> primaryInFullyProcessedWindowsRoot,
WindowedValue<?> primarySplitRoot,
WindowedValue<?> residualSplitRoot,
WindowedValue<?> residualInUnprocessedWindowsRoot) {
return new AutoValue_FnApiDoFnRunner_WindowedSplitResult(
primaryInFullyProcessedWindowsRoot,
primarySplitRoot,
residualSplitRoot,
residualInUnprocessedWindowsRoot);
}
public abstract @Nullable WindowedValue<?> getPrimaryInFullyProcessedWindowsRoot();
public abstract @Nullable WindowedValue<?> getPrimarySplitRoot();
public abstract @Nullable WindowedValue<?> getResidualSplitRoot();
public abstract @Nullable WindowedValue<?> getResidualInUnprocessedWindowsRoot();
}
@AutoValue
@AutoValue.CopyAnnotations
abstract static class SplitResultsWithStopIndex {
public static SplitResultsWithStopIndex of(
WindowedSplitResult windowSplit,
HandlesSplits.SplitResult downstreamSplit,
int newWindowStopIndex) {
return new AutoValue_FnApiDoFnRunner_SplitResultsWithStopIndex(
windowSplit, downstreamSplit, newWindowStopIndex);
}
public abstract @Nullable WindowedSplitResult getWindowSplit();
public abstract HandlesSplits.@Nullable SplitResult getDownstreamSplit();
public abstract int getNewWindowStopIndex();
}
private void processElementForWindowObservingSizedElementAndRestriction(
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
currentElement = elem.withValue(elem.getValue().getKey().getKey());
try {
windowCurrentIndex = -1;
windowStopIndex = currentElement.getWindows().size();
currentWindows = ImmutableList.copyOf(currentElement.getWindows());
while (true) {
synchronized (splitLock) {
windowCurrentIndex++;
if (windowCurrentIndex >= windowStopIndex) {
return;
}
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
currentWindow = currentWindows.get(windowCurrentIndex);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
@Override
public void onClaimFailed(PositionT position) {}
});
currentWatermarkEstimator =
WatermarkEstimators.threadSafe(
doFnInvoker.invokeNewWatermarkEstimator(processContext));
initialWatermark = currentWatermarkEstimator.getWatermarkAndState().getKey();
}
// It is important to ensure that {@code splitLock} is not held during #invokeProcessElement
DoFn.ProcessContinuation continuation = doFnInvoker.invokeProcessElement(processContext);
// Ensure that all the work is done if the user tells us that they don't want to
// resume processing.
if (!continuation.shouldResume()) {
currentTracker.checkDone();
continue;
}
// Attempt to checkpoint the current restriction.
HandlesSplits.SplitResult splitResult =
trySplitForElementAndRestriction(0, continuation.resumeDelay());
/**
* After the user has chosen to resume processing later, either the restriction is already
* done and the user unknowingly claimed the last element or the Runner may have stolen the
* remainder of work through a split call so the above trySplit may return null. If so, the
* current restriction must be done.
*/
if (splitResult == null) {
currentTracker.checkDone();
continue;
}
// Forward the split to the bundle level split listener.
splitListener.split(splitResult.getPrimaryRoots(), splitResult.getResidualRoots());
}
} finally {
synchronized (splitLock) {
currentElement = null;
currentRestriction = null;
currentWatermarkEstimatorState = null;
currentWindow = null;
currentTracker = null;
currentWatermarkEstimator = null;
currentWindows = null;
initialWatermark = null;
}
}
}
/**
* An abstract class which forwards split and progress calls allowing the implementer to choose
* where input elements are sent.
*/
private abstract class SplittableFnDataReceiver
implements HandlesSplits, FnDataReceiver<WindowedValue> {
@Override
public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
return trySplitForElementAndRestriction(fractionOfRemainder, Duration.ZERO);
}
@Override
public double getProgress() {
Progress progress = FnApiDoFnRunner.this.getProgress();
if (progress != null) {
double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining();
if (totalWork > 0) {
return progress.getWorkCompleted() / totalWork;
}
}
return 0;
}
}
private Progress getProgress() {
synchronized (splitLock) {
if (currentTracker instanceof RestrictionTracker.HasProgress) {
return scaleProgress(
((HasProgress) currentTracker).getProgress(), windowCurrentIndex, windowStopIndex);
}
}
return null;
}
private Progress getProgressFromWindowObservingTruncate(double elementCompleted) {
synchronized (splitLock) {
if (currentWindow != null) {
return scaleProgress(
Progress.from(elementCompleted, 1 - elementCompleted),
windowCurrentIndex,
windowStopIndex);
}
}
return null;
}
@VisibleForTesting
static Progress scaleProgress(Progress progress, int currentWindowIndex, int stopWindowIndex) {
double totalWorkPerWindow = progress.getWorkCompleted() + progress.getWorkRemaining();
double completed = totalWorkPerWindow * currentWindowIndex + progress.getWorkCompleted();
double remaining =
totalWorkPerWindow * (stopWindowIndex - currentWindowIndex - 1)
+ progress.getWorkRemaining();
return Progress.from(completed, remaining);
}
private WindowedSplitResult calculateRestrictionSize(
WindowedSplitResult splitResult, String errorContext) {
double fullSize =
splitResult.getResidualInUnprocessedWindowsRoot() == null
&& splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
? 0
: doFnInvoker.invokeGetSize(
new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
@Override
public Object restriction() {
return currentRestriction;
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return doFnInvoker.invokeNewTracker(this);
}
});
double primarySize =
splitResult.getPrimarySplitRoot() == null
? 0
: doFnInvoker.invokeGetSize(
new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
@Override
public Object restriction() {
return ((KV<?, KV<?, ?>>) splitResult.getPrimarySplitRoot().getValue())
.getValue()
.getKey();
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return doFnInvoker.invokeNewTracker(this);
}
});
double residualSize =
splitResult.getResidualSplitRoot() == null
? 0
: doFnInvoker.invokeGetSize(
new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
@Override
public Object restriction() {
return ((KV<?, KV<?, ?>>) splitResult.getResidualSplitRoot().getValue())
.getValue()
.getKey();
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return doFnInvoker.invokeNewTracker(this);
}
});
return WindowedSplitResult.forRoots(
splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
? null
: WindowedValue.of(
KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
splitResult.getPrimarySplitRoot() == null
? null
: WindowedValue.of(
KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize),
splitResult.getPrimarySplitRoot().getTimestamp(),
splitResult.getPrimarySplitRoot().getWindows(),
splitResult.getPrimarySplitRoot().getPane()),
splitResult.getResidualSplitRoot() == null
? null
: WindowedValue.of(
KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize),
splitResult.getResidualSplitRoot().getTimestamp(),
splitResult.getResidualSplitRoot().getWindows(),
splitResult.getResidualSplitRoot().getPane()),
splitResult.getResidualInUnprocessedWindowsRoot() == null
? null
: WindowedValue.of(
KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
}
private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
double fractionOfRemainder, HandlesSplits splitDelegate) {
WindowedSplitResult windowedSplitResult = null;
HandlesSplits.SplitResult downstreamSplitResult = null;
synchronized (splitLock) {
// There is nothing to split if we are between truncate processing calls.
if (currentWindow == null) {
return null;
}
SplitResultsWithStopIndex splitResult =
computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
currentWindow,
currentWindows,
currentWatermarkEstimatorState,
fractionOfRemainder,
null,
splitDelegate,
null,
windowCurrentIndex,
windowStopIndex);
if (splitResult == null) {
return null;
}
windowStopIndex = splitResult.getNewWindowStopIndex();
windowedSplitResult =
calculateRestrictionSize(
splitResult.getWindowSplit(),
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN + "/GetSize");
downstreamSplitResult = splitResult.getDownstreamSplit();
}
// Note that the assumption here is the fullInputCoder of the Truncate transform should be the
// the same as the SDF/Process transform.
Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
return constructSplitResult(
windowedSplitResult,
downstreamSplitResult,
fullInputCoder,
initialWatermark,
null,
pTransformId,
mainInputId,
pTransform.getOutputsMap().keySet(),
null);
}
private static <WatermarkEstimatorStateT> WindowedSplitResult computeWindowSplitResult(
WindowedValue currentElement,
Object currentRestriction,
BoundedWindow currentWindow,
List<BoundedWindow> windows,
WatermarkEstimatorStateT currentWatermarkEstimatorState,
int toIndex,
int fromIndex,
int stopWindowIndex,
SplitResult<?> splitResult,
KV<Instant, WatermarkEstimatorStateT> watermarkAndState) {
List<BoundedWindow> primaryFullyProcessedWindows = windows.subList(0, toIndex);
List<BoundedWindow> residualUnprocessedWindows = windows.subList(fromIndex, stopWindowIndex);
WindowedSplitResult windowedSplitResult;
windowedSplitResult =
WindowedSplitResult.forRoots(
primaryFullyProcessedWindows.isEmpty()
? null
: WindowedValue.of(
KV.of(
currentElement.getValue(),
KV.of(currentRestriction, currentWatermarkEstimatorState)),
currentElement.getTimestamp(),
primaryFullyProcessedWindows,
currentElement.getPane()),
splitResult == null
? null
: WindowedValue.of(
KV.of(
currentElement.getValue(),
KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)),
currentElement.getTimestamp(),
currentWindow,
currentElement.getPane()),
splitResult == null
? null
: WindowedValue.of(
KV.of(
currentElement.getValue(),
KV.of(splitResult.getResidual(), watermarkAndState.getValue())),
currentElement.getTimestamp(),
currentWindow,
currentElement.getPane()),
residualUnprocessedWindows.isEmpty()
? null
: WindowedValue.of(
KV.of(
currentElement.getValue(),
KV.of(currentRestriction, currentWatermarkEstimatorState)),
currentElement.getTimestamp(),
residualUnprocessedWindows,
currentElement.getPane()));
return windowedSplitResult;
}
@VisibleForTesting
static <WatermarkEstimatorStateT> SplitResultsWithStopIndex computeSplitForProcessOrTruncate(
WindowedValue currentElement,
Object currentRestriction,
BoundedWindow currentWindow,
List<BoundedWindow> windows,
WatermarkEstimatorStateT currentWatermarkEstimatorState,
double fractionOfRemainder,
RestrictionTracker currentTracker,
HandlesSplits splitDelegate,
KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
int currentWindowIndex,
int stopWindowIndex) {
// We should only have currentTracker or splitDelegate.
checkArgument((currentTracker != null) ^ (splitDelegate != null));
// When we have currentTracker, the watermarkAndState should not be null.
if (currentTracker != null) {
checkNotNull(watermarkAndState);
}
WindowedSplitResult windowedSplitResult = null;
HandlesSplits.SplitResult downstreamSplitResult = null;
int newWindowStopIndex = stopWindowIndex;
// If we are not on the last window, try to compute the split which is on the current window or
// on a future window.
if (currentWindowIndex != stopWindowIndex - 1) {
// Compute the fraction of the remainder relative to the scaled progress.
Progress elementProgress;
if (currentTracker != null) {
if (currentTracker instanceof HasProgress) {
elementProgress = ((HasProgress) currentTracker).getProgress();
} else {
elementProgress = Progress.from(0, 1);
}
} else {
double elementCompleted = splitDelegate.getProgress();
elementProgress = Progress.from(elementCompleted, 1 - elementCompleted);
}
Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
// The fraction is out of the current window and hence we will split at the closest window
// boundary.
if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
newWindowStopIndex =
(int)
Math.min(
stopWindowIndex - 1,
currentWindowIndex
+ Math.max(
1,
Math.round(
(elementProgress.getWorkCompleted() + scaledFractionOfRemainder)
/ (elementProgress.getWorkCompleted()
+ elementProgress.getWorkRemaining()))));
windowedSplitResult =
computeWindowSplitResult(
currentElement,
currentRestriction,
currentWindow,
windows,
currentWatermarkEstimatorState,
newWindowStopIndex,
newWindowStopIndex,
stopWindowIndex,
null,
watermarkAndState);
} else {
// Compute the element split with the scaled fraction.
SplitResult<?> elementSplit = null;
if (currentTracker != null) {
elementSplit =
currentTracker.trySplit(
scaledFractionOfRemainder / elementProgress.getWorkRemaining());
} else {
downstreamSplitResult = splitDelegate.trySplit(scaledFractionOfRemainder);
}
newWindowStopIndex = currentWindowIndex + 1;
int toIndex =
(elementSplit == null && downstreamSplitResult == null)
? newWindowStopIndex
: currentWindowIndex;
windowedSplitResult =
computeWindowSplitResult(
currentElement,
currentRestriction,
currentWindow,
windows,
currentWatermarkEstimatorState,
toIndex,
newWindowStopIndex,
stopWindowIndex,
elementSplit,
watermarkAndState);
}
} else {
// We are on the last window then compute the element split with given fraction.
SplitResult<?> elementSplitResult = null;
newWindowStopIndex = stopWindowIndex;
if (currentTracker != null) {
elementSplitResult = currentTracker.trySplit(fractionOfRemainder);
} else {
downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder);
}
if (elementSplitResult == null && downstreamSplitResult == null) {
return null;
}
windowedSplitResult =
computeWindowSplitResult(
currentElement,
currentRestriction,
currentWindow,
windows,
currentWatermarkEstimatorState,
currentWindowIndex,
stopWindowIndex,
stopWindowIndex,
elementSplitResult,
watermarkAndState);
}
return SplitResultsWithStopIndex.of(
windowedSplitResult, downstreamSplitResult, newWindowStopIndex);
}
@VisibleForTesting
static <WatermarkEstimatorStateT> HandlesSplits.SplitResult constructSplitResult(
WindowedSplitResult windowedSplitResult,
HandlesSplits.SplitResult downstreamElementSplit,
Coder fullInputCoder,
Instant initialWatermark,
KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
String pTransformId,
String mainInputId,
Collection<String> outputIds,
Duration resumeDelay) {
// The element split cannot from both windowedSplitResult and downstreamElementSplit.
checkArgument(
(windowedSplitResult == null || windowedSplitResult.getResidualSplitRoot() == null)
|| downstreamElementSplit == null);
List<BundleApplication> primaryRoots = new ArrayList<>();
List<DelayedBundleApplication> residualRoots = new ArrayList<>();
// Encode window splits.
if (windowedSplitResult != null
&& windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
ByteStringOutputStream primaryInOtherWindowsBytes = new ByteStringOutputStream();
try {
fullInputCoder.encode(
windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(),
primaryInOtherWindowsBytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
BundleApplication.Builder primaryApplicationInOtherWindows =
BundleApplication.newBuilder()
.setTransformId(pTransformId)
.setInputId(mainInputId)
.setElement(primaryInOtherWindowsBytes.toByteString());
primaryRoots.add(primaryApplicationInOtherWindows.build());
}
if (windowedSplitResult != null
&& windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) {
ByteStringOutputStream bytesOut = new ByteStringOutputStream();
try {
fullInputCoder.encode(windowedSplitResult.getResidualInUnprocessedWindowsRoot(), bytesOut);
} catch (IOException e) {
throw new RuntimeException(e);
}
BundleApplication.Builder residualInUnprocessedWindowsRoot =
BundleApplication.newBuilder()
.setTransformId(pTransformId)
.setInputId(mainInputId)
.setElement(bytesOut.toByteString());
// We don't want to change the output watermarks or set the checkpoint resume time since
// that applies to the current window.
Map<String, org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Timestamp>
outputWatermarkMapForUnprocessedWindows = new HashMap<>();
if (!initialWatermark.equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Timestamp outputWatermark =
org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Timestamp.newBuilder()
.setSeconds(initialWatermark.getMillis() / 1000)
.setNanos((int) (initialWatermark.getMillis() % 1000) * 1000000)
.build();
for (String outputId : outputIds) {
outputWatermarkMapForUnprocessedWindows.put(outputId, outputWatermark);
}
}
residualInUnprocessedWindowsRoot.putAllOutputWatermarks(
outputWatermarkMapForUnprocessedWindows);
residualRoots.add(
DelayedBundleApplication.newBuilder()
.setApplication(residualInUnprocessedWindowsRoot)
.build());
}
ByteStringOutputStream primaryBytes = new ByteStringOutputStream();
ByteStringOutputStream residualBytes = new ByteStringOutputStream();
// Encode element split from windowedSplitResult or from downstream element split. It's possible
// that there is no element split.
if (windowedSplitResult != null && windowedSplitResult.getResidualSplitRoot() != null) {
// When there is element split in windowedSplitResult, the resumeDelay should not be null.
checkNotNull(resumeDelay);
try {
fullInputCoder.encode(windowedSplitResult.getPrimarySplitRoot(), primaryBytes);
fullInputCoder.encode(windowedSplitResult.getResidualSplitRoot(), residualBytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
primaryRoots.add(
BundleApplication.newBuilder()
.setTransformId(pTransformId)
.setInputId(mainInputId)
.setElement(primaryBytes.toByteString())
.build());
BundleApplication.Builder residualApplication =
BundleApplication.newBuilder()
.setTransformId(pTransformId)
.setInputId(mainInputId)
.setElement(residualBytes.toByteString());
Map<String, org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Timestamp>
outputWatermarkMap = new HashMap<>();
if (!watermarkAndState.getKey().equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Timestamp outputWatermark =
org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Timestamp.newBuilder()
.setSeconds(watermarkAndState.getKey().getMillis() / 1000)
.setNanos((int) (watermarkAndState.getKey().getMillis() % 1000) * 1000000)
.build();
for (String outputId : outputIds) {
outputWatermarkMap.put(outputId, outputWatermark);
}
}
residualApplication.putAllOutputWatermarks(outputWatermarkMap);
residualRoots.add(
DelayedBundleApplication.newBuilder()
.setApplication(residualApplication)
.setRequestedTimeDelay(Durations.fromMillis(resumeDelay.getMillis()))
.build());
} else if (downstreamElementSplit != null) {
primaryRoots.add(Iterables.getOnlyElement(downstreamElementSplit.getPrimaryRoots()));
residualRoots.add(Iterables.getOnlyElement(downstreamElementSplit.getResidualRoots()));
}
return HandlesSplits.SplitResult.of(primaryRoots, residualRoots);
}
private HandlesSplits.SplitResult trySplitForElementAndRestriction(
double fractionOfRemainder, Duration resumeDelay) {
KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
WindowedSplitResult windowedSplitResult = null;
synchronized (splitLock) {
// There is nothing to split if we are between element and restriction processing calls.
if (currentTracker == null) {
return null;
}
// Make sure to get the output watermark before we split to ensure that the lower bound
// applies to the residual.
watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
SplitResultsWithStopIndex splitResult =
computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
currentWindow,
currentWindows,
currentWatermarkEstimatorState,
fractionOfRemainder,
currentTracker,
null,
watermarkAndState,
windowCurrentIndex,
windowStopIndex);
if (splitResult == null) {
return null;
}
windowStopIndex = splitResult.getNewWindowStopIndex();
// Populate the size of primary/residual.
windowedSplitResult =
calculateRestrictionSize(
splitResult.getWindowSplit(),
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
+ "/GetSize");
}
Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
return constructSplitResult(
windowedSplitResult,
null,
fullInputCoder,
initialWatermark,
watermarkAndState,
pTransformId,
mainInputId,
pTransform.getOutputsMap().keySet(),
resumeDelay);
}
private <K> void processTimer(
String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
checkNotNull(timerBundleTracker);
try {
currentKey = timer.getUserKey();
Iterator<BoundedWindow> windowIterator =
(Iterator<BoundedWindow>) timer.getWindows().iterator();
while (windowIterator.hasNext()) {
currentWindow = windowIterator.next();
Modifications bundleModifications = timerBundleTracker.getBundleModifications();
Table<String, String, Timer<K>> modifiedTimerIds =
bundleModifications.getModifiedTimerIds();
NavigableSet<TimerInfo<K>> earlierTimers =
bundleModifications
.getModifiedTimersOrdered(timeDomain)
.headSet(TimerInfo.of(timer, "", timeDomain), true);
while (!earlierTimers.isEmpty()) {
TimerInfo<K> insertedTimer = earlierTimers.pollFirst();
if (timerModified(
modifiedTimerIds, insertedTimer.getTimerFamilyOrId(), insertedTimer.getTimer())) {
continue;
}
String timerId =
insertedTimer.getTimer().getDynamicTimerTag().isEmpty()
? insertedTimer.getTimerFamilyOrId()
: insertedTimer.getTimer().getDynamicTimerTag();
String timerFamily =
insertedTimer.getTimer().getDynamicTimerTag().isEmpty()
? ""
: insertedTimer.getTimerFamilyOrId();
// If this timer was created previously in the bundle as an overwrite of a previous timer,
// we must make sure
// to clear the old timer. Since we are firing the timer inline, the runner doesn't know
// that the old timer
// was overwritten, and will otherwise fire it - causing a spurious timer fire.
modifiedTimerIds.put(
insertedTimer.getTimerFamilyOrId(),
insertedTimer.getTimer().getDynamicTimerTag(),
Timer.cleared(
insertedTimer.getTimer().getUserKey(),
insertedTimer.getTimer().getDynamicTimerTag(),
insertedTimer.getTimer().getWindows()));
// It's important to call processTimer after inserting the above deletion, otherwise the
// above line
// would overwrite any looping timer with a deletion.
processTimerDirect(
timerFamily, timerId, insertedTimer.getTimeDomain(), insertedTimer.getTimer());
}
if (!timerModified(modifiedTimerIds, timerIdOrTimerFamilyId, timer)) {
// The timerIdOrTimerFamilyId contains either a timerId from timer declaration or
// timerFamilyId
// from timer family declaration.
boolean isFamily = timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX);
String timerId = isFamily ? "" : timerIdOrTimerFamilyId;
String timerFamilyId = isFamily ? timerIdOrTimerFamilyId : "";
processTimerDirect(timerFamilyId, timerId, timeDomain, timer);
}
}
} finally {
currentKey = null;
currentTimer = null;
currentTimeDomain = null;
currentWindow = null;
}
}
private <K> boolean timerModified(
Table<String, String, Timer<K>> modifiedTimerIds, String timerFamilyOrId, Timer<K> timer) {
@Nullable
Timer<K> modifiedTimer = modifiedTimerIds.get(timerFamilyOrId, timer.getDynamicTimerTag());
return modifiedTimer != null && !modifiedTimer.equals(timer);
}
private <K> void processTimerDirect(
String timerFamilyId, String timerId, TimeDomain timeDomain, Timer<K> timer) {
currentTimer = timer;
currentTimeDomain = timeDomain;
doFnInvoker.invokeOnTimer(timerId, timerFamilyId, onTimerContext);
}
private <K> void processOnWindowExpiration(Timer<K> timer) {
try {
currentKey = timer.getUserKey();
currentTimer = timer;
Iterator<BoundedWindow> windowIterator =
(Iterator<BoundedWindow>) timer.getWindows().iterator();
while (windowIterator.hasNext()) {
currentWindow = windowIterator.next();
doFnInvoker.invokeOnWindowExpiration(onWindowExpirationContext);
}
} finally {
currentKey = null;
currentTimer = null;
currentWindow = null;
}
}
private void finishBundle() throws Exception {
if (timerBundleTracker != null) {
timerBundleTracker.outputTimers(outboundTimerReceivers::get);
}
doFnInvoker.invokeFinishBundle(finishBundleArgumentProvider);
this.stateAccessor.finalizeState();
}
private void tearDown() {
doFnInvoker.invokeTeardown();
}
/** Outputs the given element to the specified set of consumers wrapping any exceptions. */
private <T> void outputTo(
Collection<FnDataReceiver<WindowedValue<T>>> consumers, WindowedValue<T> output) {
if (currentWatermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) currentWatermarkEstimator)
.observeTimestamp(output.getTimestamp());
}
try {
for (FnDataReceiver<WindowedValue<T>> consumer : consumers) {
consumer.accept(output);
}
} catch (Throwable t) {
throw UserCodeException.wrap(t);
}
}
private class FnApiTimer<K> implements org.apache.beam.sdk.state.Timer {
private final String timerIdOrFamily;
private final K userKey;
private final String dynamicTimerTag;
private final TimeDomain timeDomain;
private final Instant fireTimestamp;
private final Instant elementTimestampOrTimerHoldTimestamp;
private final BoundedWindow boundedWindow;
private final PaneInfo paneInfo;
private @Nullable Instant outputTimestamp;
private boolean noOutputTimestamp;
private Duration period = Duration.ZERO;
private Duration offset = Duration.ZERO;
FnApiTimer(
String timerIdOrFamily,
K userKey,
String dynamicTimerTag,
BoundedWindow boundedWindow,
Instant elementTimestampOrTimerHoldTimestamp,
Instant elementTimestampOrTimerFireTimestamp,
PaneInfo paneInfo,
TimeDomain timeDomain) {
this.timerIdOrFamily = timerIdOrFamily;
this.userKey = userKey;
this.dynamicTimerTag = dynamicTimerTag;
this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp;
this.boundedWindow = boundedWindow;
this.paneInfo = paneInfo;
this.noOutputTimestamp = false;
this.timeDomain = timeDomain;
switch (timeDomain) {
case EVENT_TIME:
fireTimestamp = elementTimestampOrTimerFireTimestamp;
break;
case PROCESSING_TIME:
// TODO: This should use an injected clock when using TestStream.
fireTimestamp = new Instant(DateTimeUtils.currentTimeMillis());
break;
default:
throw new IllegalArgumentException(
String.format("Unknown or unsupported time domain %s", timeDomain));
}
}
@Override
public void set(Instant absoluteTime) {
checkNotNull(timerBundleTracker);
// Ensures that the target time is reasonable. For event time timers this means that the time
// should be prior to window GC time.
if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
checkArgument(
!absoluteTime.isAfter(windowExpiry),
"Attempted to set event time timer for %s but that is after"
+ " the expiration of window %s",
absoluteTime,
windowExpiry);
}
timerBundleTracker.timerModified(timerIdOrFamily, timeDomain, getTimerForTime(absoluteTime));
}
@Override
public void setRelative() {
checkNotNull(timerBundleTracker);
Instant target;
if (period.equals(Duration.ZERO)) {
target = fireTimestamp.plus(offset);
} else {
long millisSinceStart = fireTimestamp.plus(offset).getMillis() % period.getMillis();
target =
millisSinceStart == 0
? fireTimestamp
: fireTimestamp.plus(period).minus(Duration.millis(millisSinceStart));
}
target = minTargetAndGcTime(target);
timerBundleTracker.timerModified(timerIdOrFamily, timeDomain, getTimerForTime(target));
}
@Override
public void clear() {
checkNotNull(timerBundleTracker);
timerBundleTracker.timerModified(timerIdOrFamily, timeDomain, getClearedTimer());
}
@Override
public org.apache.beam.sdk.state.Timer offset(Duration offset) {
this.offset = offset;
return this;
}
@Override
public org.apache.beam.sdk.state.Timer align(Duration period) {
this.period = period;
return this;
}
@Override
public org.apache.beam.sdk.state.Timer withOutputTimestamp(Instant outputTime) {
this.outputTimestamp = outputTime;
this.noOutputTimestamp = false;
return this;
}
@Override
public org.apache.beam.sdk.state.Timer withNoOutputTimestamp() {
this.outputTimestamp = null;
this.noOutputTimestamp = true;
return this;
}
@Override
public Instant getCurrentRelativeTime() {
return fireTimestamp;
}
/**
* For event time timers the target time should be prior to window GC time. So it returns
* min(time to set, GC Time of window).
*/
private Instant minTargetAndGcTime(Instant target) {
if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
if (target.isAfter(windowExpiry)) {
return windowExpiry;
}
}
return target;
}
private Timer<K> getClearedTimer() {
return Timer.cleared(userKey, dynamicTimerTag, Collections.singletonList(boundedWindow));
}
@SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
private Timer<K> getTimerForTime(Instant scheduledTime) {
if (outputTimestamp != null) {
Instant lowerBound;
try {
lowerBound = elementTimestampOrTimerHoldTimestamp.minus(doFn.getAllowedTimestampSkew());
} catch (ArithmeticException e) {
lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
}
if (outputTimestamp.isBefore(lowerBound)
|| outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
throw new IllegalArgumentException(
String.format(
"Cannot output timer with output timestamp %s. Output timestamps must be no "
+ "earlier than the timestamp of the current input (%s) minus the allowed skew "
+ "(%s) and no later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for "
+ "details on changing the allowed skew.",
outputTimestamp,
elementTimestampOrTimerHoldTimestamp,
doFn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
? doFn.getAllowedTimestampSkew()
: PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()),
BoundedWindow.TIMESTAMP_MAX_VALUE));
}
}
// Output timestamp is set to the delivery time if not initialized by an user.
if (!noOutputTimestamp
&& outputTimestamp == null
&& TimeDomain.EVENT_TIME.equals(timeDomain)) {
outputTimestamp = scheduledTime;
}
// For processing timers
if (!noOutputTimestamp && outputTimestamp == null) {
// For processing timers output timestamp will be:
// 1) timestamp of input element
// OR
// 2) hold timestamp of firing timer.
outputTimestamp = elementTimestampOrTimerHoldTimestamp;
}
if (outputTimestamp != null) {
Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
checkArgument(
!outputTimestamp.isAfter(scheduledTime),
"Attempted to set an event-time timer with an output timestamp of %s that is"
+ " after the timer firing timestamp %s",
outputTimestamp,
scheduledTime);
checkArgument(
!scheduledTime.isAfter(windowExpiry),
"Attempted to set an event-time timer with a firing timestamp of %s that is"
+ " after the expiration of window %s",
scheduledTime,
windowExpiry);
} else {
checkArgument(
!outputTimestamp.isAfter(windowExpiry),
"Attempted to set a processing-time timer with an output timestamp of %s that is"
+ " after the expiration of window %s",
outputTimestamp,
windowExpiry);
}
} else {
outputTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
}
return Timer.of(
userKey,
dynamicTimerTag,
Collections.singletonList(boundedWindow),
scheduledTime,
outputTimestamp,
paneInfo);
}
}
private class FnApiTimerMap<K> implements TimerMap {
private final String timerFamilyId;
private final K userKey;
private final TimeDomain timeDomain;
private final Instant elementTimestampOrTimerHoldTimestamp;
private final Instant elementTimestampOrTimerFireTimestamp;
private final BoundedWindow boundedWindow;
private final PaneInfo paneInfo;
FnApiTimerMap(
String timerFamilyId,
K userKey,
BoundedWindow boundedWindow,
Instant elementTimestampOrTimerHoldTimestamp,
Instant elementTimestampOrTimerFireTimestamp,
PaneInfo paneInfo) {
this.timerFamilyId = timerFamilyId;
this.userKey = userKey;
this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp;
this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp;
this.boundedWindow = boundedWindow;
this.paneInfo = paneInfo;
this.timeDomain =
translateTimeDomain(
parDoPayload.getTimerFamilySpecsMap().get(timerFamilyId).getTimeDomain());
}
@Override
public void set(String dynamicTimerTag, Instant absoluteTime) {
get(dynamicTimerTag).set(absoluteTime);
}
@Override
public org.apache.beam.sdk.state.Timer get(String dynamicTimerTag) {
return new FnApiTimer(
timerFamilyId,
userKey,
dynamicTimerTag,
boundedWindow,
elementTimestampOrTimerHoldTimestamp,
elementTimestampOrTimerFireTimestamp,
paneInfo,
timeDomain);
}
}
@SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
private void checkTimestamp(Instant timestamp) {
Instant lowerBound;
try {
lowerBound = currentElement.getTimestamp().minus(doFn.getAllowedTimestampSkew());
} catch (ArithmeticException e) {
lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
}
if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
throw new IllegalArgumentException(
String.format(
"Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ "timestamp of the current input (%s) minus the allowed skew (%s) and no later "
+ "than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on "
+ "changing the allowed skew.",
timestamp,
currentElement.getTimestamp(),
doFn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
? doFn.getAllowedTimestampSkew()
: PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()),
BoundedWindow.TIMESTAMP_MAX_VALUE));
}
}
private class StartBundleArgumentProvider extends BaseArgumentProvider<InputT, OutputT> {
private class Context extends DoFn<InputT, OutputT>.StartBundleContext {
Context() {
doFn.super();
}
@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
}
}
private final StartBundleArgumentProvider.Context context =
new StartBundleArgumentProvider.Context();
@Override
public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
return context;
}
@Override
public PipelineOptions pipelineOptions() {
return pipelineOptions;
}
@Override
public BundleFinalizer bundleFinalizer() {
return bundleFinalizer;
}
@Override
public String getErrorContext() {
return "FnApiDoFnRunner/StartBundle";
}
}
private class FinishBundleArgumentProvider extends BaseArgumentProvider<InputT, OutputT> {
private class Context extends DoFn<InputT, OutputT>.FinishBundleContext {
Context() {
doFn.super();
}
@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
}
@Override
public void output(OutputT output, Instant timestamp, BoundedWindow window) {
outputTo(
mainOutputConsumers, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
}
@Override
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
Collection<FnDataReceiver<WindowedValue<T>>> consumers =
(Collection) localNameToConsumer.get(tag.getId());
if (consumers == null) {
throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
}
outputTo(consumers, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
}
}
private final FinishBundleArgumentProvider.Context context =
new FinishBundleArgumentProvider.Context();
@Override
public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
DoFn<InputT, OutputT> doFn) {
return context;
}
@Override
public PipelineOptions pipelineOptions() {
return pipelineOptions;
}
@Override
public BundleFinalizer bundleFinalizer() {
return bundleFinalizer;
}
@Override
public String getErrorContext() {
return "FnApiDoFnRunner/FinishBundle";
}
}
/** Provides arguments for a {@link DoFnInvoker} for a window observing method. */
private class WindowObservingProcessBundleContext extends ProcessBundleContextBase {
@Override
public BoundedWindow window() {
return currentWindow;
}
@Override
public Object sideInput(String tagId) {
return sideInput(sideInputMapping.get(tagId));
}
@Override
public <T> T sideInput(PCollectionView<T> view) {
return stateAccessor.get(view, currentWindow);
}
@Override
public State state(String stateId, boolean alwaysFetched) {
StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId);
checkNotNull(stateDeclaration, "No state declaration found for %s", stateId);
StateSpec<?> spec;
try {
spec = (StateSpec<?>) stateDeclaration.field().get(doFn);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
State state = spec.bind(stateId, stateAccessor);
if (alwaysFetched) {
return (State) ((ReadableState) state).readLater();
} else {
return state;
}
}
@Override
public org.apache.beam.sdk.state.Timer timer(String timerId) {
checkState(
currentElement.getValue() instanceof KV,
"Accessing timer in unkeyed context. Current element is not a KV: %s.",
currentElement.getValue());
// For the initial timestamps we pass in the current elements timestamp for the hold timestamp
// and the current element's timestamp which will be used for the fire timestamp if this
// timer is in the EVENT time domain.
TimeDomain timeDomain =
translateTimeDomain(parDoPayload.getTimerFamilySpecsMap().get(timerId).getTimeDomain());
return new FnApiTimer(
timerId,
((KV) currentElement.getValue()).getKey(),
"",
currentWindow,
currentElement.getTimestamp(),
currentElement.getTimestamp(),
currentElement.getPane(),
timeDomain);
}
@Override
public TimerMap timerFamily(String timerFamilyId) {
return new FnApiTimerMap(
timerFamilyId,
((KV) currentElement.getValue()).getKey(),
currentWindow,
currentElement.getTimestamp(),
currentElement.getTimestamp(),
currentElement.getPane());
}
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
// TODO: Check that timestamp is valid once all runners can provide proper timestamps.
outputTo(
mainOutputConsumers,
WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
}
@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
// TODO: Check that timestamp is valid once all runners can provide proper timestamps.
Collection<FnDataReceiver<WindowedValue<T>>> consumers =
(Collection) localNameToConsumer.get(tag.getId());
if (consumers == null) {
throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
}
outputTo(
consumers, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
}
}
/** This context outputs KV<KV<Element, KV<Restriction, WatemarkEstimatorState>>, Size>. */
private class SizedRestrictionWindowObservingProcessBundleContext
extends WindowObservingProcessBundleContext {
private final String errorContextPrefix;
SizedRestrictionWindowObservingProcessBundleContext(String errorContextPrefix) {
this.errorContextPrefix = errorContextPrefix;
}
@Override
// OutputT == RestrictionT
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkTimestamp(timestamp);
double size =
doFnInvoker.invokeGetSize(
new DelegatingArgumentProvider<InputT, OutputT>(
this, this.errorContextPrefix + "/GetSize") {
@Override
public Object restriction() {
return output;
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return timestamp;
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return doFnInvoker.invokeNewTracker(this);
}
});
outputTo(
mainOutputConsumers,
(WindowedValue<OutputT>)
WindowedValue.of(
KV.of(
KV.of(
currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)),
size),
timestamp,
currentWindow,
currentElement.getPane()));
}
}
/** This context outputs KV<KV<Element, KV<Restriction, WatermarkEstimatorState>>, Size>. */
private class SizedRestrictionNonWindowObservingProcessBundleContext
extends NonWindowObservingProcessBundleContext {
private final String errorContextPrefix;
SizedRestrictionNonWindowObservingProcessBundleContext(String errorContextPrefix) {
this.errorContextPrefix = errorContextPrefix;
}
@Override
// OutputT == RestrictionT
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkTimestamp(timestamp);
double size =
doFnInvoker.invokeGetSize(
new DelegatingArgumentProvider<InputT, OutputT>(
this, errorContextPrefix + "/GetSize") {
@Override
public Object restriction() {
return output;
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return timestamp;
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return doFnInvoker.invokeNewTracker(this);
}
});
outputTo(
mainOutputConsumers,
(WindowedValue<OutputT>)
WindowedValue.of(
KV.of(
KV.of(
currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)),
size),
timestamp,
currentElement.getWindows(),
currentElement.getPane()));
}
}
/** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */
private class NonWindowObservingProcessBundleContext extends ProcessBundleContextBase {
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkTimestamp(timestamp);
outputTo(
mainOutputConsumers,
WindowedValue.of(
output, timestamp, currentElement.getWindows(), currentElement.getPane()));
}
@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkTimestamp(timestamp);
Collection<FnDataReceiver<WindowedValue<T>>> consumers =
(Collection) localNameToConsumer.get(tag.getId());
if (consumers == null) {
throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
}
outputTo(
consumers,
WindowedValue.of(
output, timestamp, currentElement.getWindows(), currentElement.getPane()));
}
@Override
public BoundedWindow window() {
throw new UnsupportedOperationException(
"Cannot access window in non-window observing context.");
}
@Override
public Object sideInput(String tagId) {
throw new UnsupportedOperationException(
"Cannot access sideInput in non-window observing context.");
}
@Override
public <T> T sideInput(PCollectionView<T> view) {
throw new UnsupportedOperationException(
"Cannot access sideInput in non-window observing context.");
}
@Override
public State state(String stateId, boolean alwaysFetched) {
throw new UnsupportedOperationException(
"Cannot access state in non-window observing context.");
}
@Override
public org.apache.beam.sdk.state.Timer timer(String timerId) {
throw new UnsupportedOperationException(
"Cannot access timer in non-window observing context.");
}
@Override
public TimerMap timerFamily(String timerFamilyId) {
throw new UnsupportedOperationException(
"Cannot access timerFamily in non-window observing context.");
}
}
/** Base implementation that does not override methods which need to be window aware. */
private abstract class ProcessBundleContextBase extends DoFn<InputT, OutputT>.ProcessContext
implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
private ProcessBundleContextBase() {
doFn.super();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return pane();
}
@Override
public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access StartBundleContext outside of @StartBundle method.");
}
@Override
public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access FinishBundleContext outside of @FinishBundle method.");
}
@Override
public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
return this;
}
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return element();
}
@Override
public Object key() {
throw new UnsupportedOperationException(
"Cannot access key as parameter outside of @OnTimer method.");
}
@Override
public Object schemaElement(int index) {
SerializableFunction converter = doFnSchemaInformation.getElementConverters().get(index);
return converter.apply(element());
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return timestamp();
}
@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access timerId as parameter outside of @OnTimer method.");
}
@Override
public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access time domain outside of @ProcessTimer method.");
}
@Override
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedReceiver(this, null);
}
@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.rowReceiver(this, null, mainOutputSchemaCoder);
}
@Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders);
}
@Override
public BundleFinalizer bundleFinalizer() {
return bundleFinalizer;
}
@Override
public Object restriction() {
return currentRestriction;
}
@Override
public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
"Cannot access OnTimerContext outside of @OnTimer methods.");
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return currentTracker;
}
@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
}
@Override
public PipelineOptions pipelineOptions() {
return pipelineOptions;
}
@Override
public void output(OutputT output) {
outputWithTimestamp(output, currentElement.getTimestamp());
}
@Override
public <T> void output(TupleTag<T> tag, T output) {
outputWithTimestamp(tag, output, currentElement.getTimestamp());
}
@Override
public InputT element() {
return currentElement.getValue();
}
@Override
public Instant timestamp() {
return currentElement.getTimestamp();
}
@Override
public PaneInfo pane() {
return currentElement.getPane();
}
@Override
public Object watermarkEstimatorState() {
return currentWatermarkEstimatorState;
}
@Override
public WatermarkEstimator<?> watermarkEstimator() {
return currentWatermarkEstimator;
}
}
/**
* Provides arguments for a {@link DoFnInvoker} for {@link
* DoFn.OnWindowExpiration @OnWindowExpiration}.
*/
private class OnWindowExpirationContext<K> extends BaseArgumentProvider<InputT, OutputT> {
private class Context extends DoFn<InputT, OutputT>.OnWindowExpirationContext {
private Context() {
doFn.super();
}
@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
}
@Override
public BoundedWindow window() {
return currentWindow;
}
@Override
public void output(OutputT output) {
outputTo(
mainOutputConsumers,
WindowedValue.of(
output, currentTimer.getHoldTimestamp(), currentWindow, currentTimer.getPane()));
}
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkOnWindowExpirationTimestamp(timestamp);
outputTo(
mainOutputConsumers,
WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane()));
}
@Override
public <T> void output(TupleTag<T> tag, T output) {
Collection<FnDataReceiver<WindowedValue<T>>> consumers =
(Collection) localNameToConsumer.get(tag.getId());
if (consumers == null) {
throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
}
outputTo(
consumers,
WindowedValue.of(
output, currentTimer.getHoldTimestamp(), currentWindow, currentTimer.getPane()));
}
@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkOnWindowExpirationTimestamp(timestamp);
Collection<FnDataReceiver<WindowedValue<T>>> consumers =
(Collection) localNameToConsumer.get(tag.getId());
if (consumers == null) {
throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
}
outputTo(
consumers, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane()));
}
@SuppressWarnings(
"deprecation") // Allowed Skew is deprecated for users, but must be respected
private void checkOnWindowExpirationTimestamp(Instant timestamp) {
Instant lowerBound;
try {
lowerBound = currentTimer.getHoldTimestamp().minus(doFn.getAllowedTimestampSkew());
} catch (ArithmeticException e) {
lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
}
if (timestamp.isBefore(lowerBound)
|| timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
throw new IllegalArgumentException(
String.format(
"Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ "timestamp of the timer (%s) minus the allowed skew (%s) and no later "
+ "than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on "
+ "changing the allowed skew.",
timestamp,
currentTimer.getHoldTimestamp(),
doFn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
? doFn.getAllowedTimestampSkew()
: PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()),
BoundedWindow.TIMESTAMP_MAX_VALUE));
}
}
}
private final OnWindowExpirationContext.Context context =
new OnWindowExpirationContext.Context();
@Override
public BoundedWindow window() {
return currentWindow;
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return currentTimer.getHoldTimestamp();
}
@Override
public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
return currentTimeDomain;
}
@Override
public K key() {
return (K) currentTimer.getUserKey();
}
@Override
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedReceiver(context, null);
}
@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.rowReceiver(context, null, mainOutputSchemaCoder);
}
@Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(context);
}
@Override
public State state(String stateId, boolean alwaysFetched) {
StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId);
checkNotNull(stateDeclaration, "No state declaration found for %s", stateId);
StateSpec<?> spec;
try {
spec = (StateSpec<?>) stateDeclaration.field().get(doFn);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
State state = spec.bind(stateId, stateAccessor);
if (alwaysFetched) {
return (State) ((ReadableState) state).readLater();
} else {
return state;
}
}
@Override
public PipelineOptions pipelineOptions() {
return pipelineOptions;
}
@Override
public String getErrorContext() {
return "FnApiDoFnRunner/OnWindowExpiration";
}
}
/** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer @OnTimer}. */
private class OnTimerContext<K> extends BaseArgumentProvider<InputT, OutputT> {
private class Context extends DoFn<InputT, OutputT>.OnTimerContext {
private Context() {
doFn.super();
}
@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
}
@Override
public BoundedWindow window() {
return currentWindow;
}
@Override
public void output(OutputT output) {
checkTimerTimestamp(currentTimer.getHoldTimestamp());
outputTo(
mainOutputConsumers,
WindowedValue.of(
output, currentTimer.getHoldTimestamp(), currentWindow, currentTimer.getPane()));
}
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkTimerTimestamp(timestamp);
outputTo(
mainOutputConsumers,
WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane()));
}
@Override
public <T> void output(TupleTag<T> tag, T output) {
checkTimerTimestamp(currentTimer.getHoldTimestamp());
Collection<FnDataReceiver<WindowedValue<T>>> consumers =
(Collection) localNameToConsumer.get(tag.getId());
if (consumers == null) {
throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
}
outputTo(
consumers,
WindowedValue.of(
output, currentTimer.getHoldTimestamp(), currentWindow, currentTimer.getPane()));
}
@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkTimerTimestamp(timestamp);
Collection<FnDataReceiver<WindowedValue<T>>> consumers =
(Collection) localNameToConsumer.get(tag.getId());
if (consumers == null) {
throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
}
outputTo(
consumers, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane()));
}
@Override
public TimeDomain timeDomain() {
return currentTimeDomain;
}
@Override
public Instant fireTimestamp() {
return currentTimer.getFireTimestamp();
}
@Override
public Instant timestamp() {
return currentTimer.getHoldTimestamp();
}
@SuppressWarnings(
"deprecation") // Allowed Skew is deprecated for users, but must be respected
private void checkTimerTimestamp(Instant timestamp) {
Instant lowerBound;
try {
lowerBound = currentTimer.getHoldTimestamp().minus(doFn.getAllowedTimestampSkew());
} catch (ArithmeticException e) {
lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
}
if (timestamp.isBefore(lowerBound)
|| timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
throw new IllegalArgumentException(
String.format(
"Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ "timestamp of the timer (%s) minus the allowed skew (%s) and no later "
+ "than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on "
+ "changing the allowed skew.",
timestamp,
currentTimer.getHoldTimestamp(),
doFn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
? doFn.getAllowedTimestampSkew()
: PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()),
BoundedWindow.TIMESTAMP_MAX_VALUE));
}
}
}
private final OnTimerContext.Context context = new OnTimerContext.Context();
@Override
public BoundedWindow window() {
return currentWindow;
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return currentTimer.getHoldTimestamp();
}
@Override
public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
return currentTimeDomain;
}
@Override
public K key() {
return (K) currentTimer.getUserKey();
}
@Override
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedReceiver(context, null);
}
@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.rowReceiver(context, null, mainOutputSchemaCoder);
}
@Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(context);
}
@Override
public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
return context;
}
@Override
public State state(String stateId, boolean alwaysFetched) {
StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId);
checkNotNull(stateDeclaration, "No state declaration found for %s", stateId);
StateSpec<?> spec;
try {
spec = (StateSpec<?>) stateDeclaration.field().get(doFn);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
State state = spec.bind(stateId, stateAccessor);
if (alwaysFetched) {
return (State) ((ReadableState) state).readLater();
} else {
return state;
}
}
@Override
public org.apache.beam.sdk.state.Timer timer(String timerId) {
TimeDomain timeDomain =
translateTimeDomain(parDoPayload.getTimerFamilySpecsMap().get(timerId).getTimeDomain());
return new FnApiTimer(
timerId,
currentTimer.getUserKey(),
"",
currentWindow,
currentTimer.getHoldTimestamp(),
currentTimer.getFireTimestamp(),
currentTimer.getPane(),
timeDomain);
}
@Override
public TimerMap timerFamily(String timerFamilyId) {
return new FnApiTimerMap(
timerFamilyId,
currentTimer.getUserKey(),
currentWindow,
currentTimer.getHoldTimestamp(),
currentTimer.getFireTimestamp(),
currentTimer.getPane());
}
@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
// Timer id is aliased to dynamic timer tag in a TimerFamily timer.
return currentTimer.getDynamicTimerTag();
}
@Override
public PipelineOptions pipelineOptions() {
return pipelineOptions;
}
@Override
public String getErrorContext() {
return "FnApiDoFnRunner/OnTimer";
}
}
private TimeDomain translateTimeDomain(
org.apache.beam.model.pipeline.v1.RunnerApi.TimeDomain.Enum domain) {
switch (domain) {
case EVENT_TIME:
return TimeDomain.EVENT_TIME;
case PROCESSING_TIME:
return TimeDomain.PROCESSING_TIME;
default:
throw new IllegalArgumentException("Unknown time domain");
}
}
}