blob: c249012b3467c767fd95acf8a173b1ed9eced5d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.splittabledofn;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
import java.util.List;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.util.Timestamps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Instant;
/**
* Helper class for feeding element/restricton pairs into a {@link
* PTransformTranslation#SPLITTABLE_PROCESS_ELEMENTS_URN} transform, implementing checkpointing
* only, by using state and timers for storing the last element/restriction pair, similarly to
* {@link org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn} but in a portable
* fashion.
*/
public class SDFFeederViaStateAndTimers<InputT, RestrictionT> {
private final Coder<BoundedWindow> windowCoder;
private final Coder<WindowedValue<KV<InputT, RestrictionT>>> elementRestrictionWireCoder;
private final StateInternals stateInternals;
private final TimerInternals timerInternals;
private StateNamespace stateNamespace;
private final StateTag<ValueState<WindowedValue<KV<InputT, RestrictionT>>>> seedTag;
private ValueState<WindowedValue<KV<InputT, RestrictionT>>> seedState;
private final StateTag<ValueState<RestrictionT>> restrictionTag;
private ValueState<RestrictionT> restrictionState;
private StateTag<WatermarkHoldState> watermarkHoldTag =
StateTags.makeSystemTagInternal(
StateTags.<GlobalWindow>watermarkStateInternal("hold", TimestampCombiner.LATEST));
private WatermarkHoldState holdState;
private Instant inputTimestamp;
private List<BundleApplication> primaryRoots;
private List<DelayedBundleApplication> residualRoots;
/** Initializes the feeder. */
public SDFFeederViaStateAndTimers(
StateInternals stateInternals,
TimerInternals timerInternals,
Coder<InputT> elementWireCoder,
Coder<RestrictionT> restrictionWireCoder,
Coder<BoundedWindow> windowCoder) {
this.stateInternals = stateInternals;
this.timerInternals = timerInternals;
this.windowCoder = windowCoder;
this.elementRestrictionWireCoder =
FullWindowedValueCoder.of(KvCoder.of(elementWireCoder, restrictionWireCoder), windowCoder);
this.seedTag = StateTags.value("seed", elementRestrictionWireCoder);
this.restrictionTag = StateTags.value("restriction", restrictionWireCoder);
}
/** Passes the initial element/restriction pair. */
public void seed(WindowedValue<KV<InputT, RestrictionT>> elementRestriction) {
initState(
StateNamespaces.window(
windowCoder, Iterables.getOnlyElement(elementRestriction.getWindows())));
seedState.write(elementRestriction);
inputTimestamp = elementRestriction.getTimestamp();
}
/**
* Resumes from a timer and returns the current element/restriction pair (with an up-to-date value
* of the restriction).
*/
public WindowedValue<KV<InputT, RestrictionT>> resume(TimerData timer) {
initState(timer.getNamespace());
WindowedValue<KV<InputT, RestrictionT>> seed = seedState.read();
inputTimestamp = seed.getTimestamp();
return seed.withValue(KV.of(seed.getValue().getKey(), restrictionState.read()));
}
/**
* Commits the state and timers: clears both if no checkpoint happened, or adjusts the restriction
* and sets a wake-up timer if a checkpoint happened.
*/
public void commit() throws IOException {
if (primaryRoots == null) {
// No split - the call terminated.
seedState.clear();
restrictionState.clear();
holdState.clear();
return;
}
// For now can only happen on the first instruction which is SPLITTABLE_PROCESS_ELEMENTS.
checkArgument(residualRoots.size() == 1, "More than 1 residual is unsupported for now");
DelayedBundleApplication residual = residualRoots.get(0);
ByteString encodedResidual = residual.getApplication().getElement();
WindowedValue<KV<InputT, RestrictionT>> decodedResidual =
elementRestrictionWireCoder.decode(encodedResidual.newInput());
restrictionState.write(decodedResidual.getValue().getValue());
Instant watermarkHold =
residual.getApplication().getOutputWatermarksMap().isEmpty()
? inputTimestamp
: new Instant(
Iterables.getOnlyElement(
residual.getApplication().getOutputWatermarksMap().values()));
checkArgument(
!watermarkHold.isBefore(inputTimestamp),
"Watermark hold %s can not be before input timestamp %s",
watermarkHold,
inputTimestamp);
holdState.add(watermarkHold);
Instant requestedWakeupTime =
new Instant(Timestamps.toMillis(residual.getRequestedExecutionTime()));
Instant wakeupTime =
timerInternals.currentProcessingTime().isBefore(requestedWakeupTime)
? requestedWakeupTime
: timerInternals.currentProcessingTime();
// Set a timer to continue processing this element.
timerInternals.setTimer(
stateNamespace, "sdfContinuation", wakeupTime, TimeDomain.PROCESSING_TIME);
}
/** Signals that a split happened. */
public void split(
List<BundleApplication> primaryRoots, List<DelayedBundleApplication> residualRoots) {
checkState(
this.primaryRoots == null,
"At most 1 split supported, however got new split (%s, %s) "
+ "in addition to existing (%s, %s)",
primaryRoots,
residualRoots,
this.primaryRoots,
this.residualRoots);
this.primaryRoots = primaryRoots;
this.residualRoots = residualRoots;
}
private void initState(StateNamespace ns) {
stateNamespace = ns;
seedState = stateInternals.state(ns, seedTag);
restrictionState = stateInternals.state(ns, restrictionTag);
holdState = stateInternals.state(ns, watermarkHoldTag);
}
}