blob: 14a9502c1a9fb8dbe3ee9f3246cc09c890c7a402 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
/**
* A customized {@link DoFnRunner} that handles late data dropping and garbage collection for
* stateful {@link DoFn DoFns}. It registers a GC timer in {@link #processElement(WindowedValue)}
* and does cleanup in {@link #onTimer(String, BoundedWindow, Instant, TimeDomain)}
*
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
implements DoFnRunner<InputT, OutputT> {
public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped";
private final DoFnRunner<InputT, OutputT> doFnRunner;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Counter droppedDueToLateness =
Metrics.counter(StatefulDoFnRunner.class, DROPPED_DUE_TO_LATENESS_COUNTER);
private final CleanupTimer<InputT> cleanupTimer;
private final StateCleaner stateCleaner;
public StatefulDoFnRunner(
DoFnRunner<InputT, OutputT> doFnRunner,
WindowingStrategy<?, ?> windowingStrategy,
CleanupTimer<InputT> cleanupTimer,
StateCleaner<W> stateCleaner) {
this.doFnRunner = doFnRunner;
this.windowingStrategy = windowingStrategy;
this.cleanupTimer = cleanupTimer;
this.stateCleaner = stateCleaner;
WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
rejectMergingWindowFn(windowFn);
}
private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
if (!(windowFn instanceof NonMergingWindowFn)) {
throw new UnsupportedOperationException(
"MergingWindowFn is not supported for stateful DoFns, WindowFn is: " + windowFn);
}
}
@Override
public DoFn<InputT, OutputT> getFn() {
return doFnRunner.getFn();
}
@Override
public void startBundle() {
doFnRunner.startBundle();
}
@Override
public void processElement(WindowedValue<InputT> input) {
// StatefulDoFnRunner always observes windows, so we need to explode
for (WindowedValue<InputT> value : input.explodeWindows()) {
BoundedWindow window = value.getWindows().iterator().next();
if (isLate(window)) {
// The element is too late for this window.
droppedDueToLateness.inc();
WindowTracing.debug(
"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
+ "since too far behind inputWatermark:{}",
input.getTimestamp(),
window,
cleanupTimer.currentInputWatermarkTime());
} else {
cleanupTimer.setForWindow(value.getValue(), window);
doFnRunner.processElement(value);
}
}
}
private boolean isLate(BoundedWindow window) {
Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
Instant inputWM = cleanupTimer.currentInputWatermarkTime();
return gcTime.isBefore(inputWM);
}
@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
} else {
// An event-time timer can never be late because we don't allow setting timers after GC time.
// Ot can happen that a processing-time time fires for a late window, we need to ignore
// this.
if (!timeDomain.equals(TimeDomain.EVENT_TIME) && isLate(window)) {
// don't increment the dropped counter, only do that for elements
WindowTracing.debug(
"StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} "
+ "since window is too far behind inputWatermark:{}",
timestamp,
window,
cleanupTimer.currentInputWatermarkTime());
} else {
doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
}
}
}
@Override
public void finishBundle() {
doFnRunner.finishBundle();
}
/**
* A cleaner for deciding when to clean state of window.
*
* <p>A runner might either (a) already know that it always has a timer set for the expiration
* time or (b) not need a timer at all because it is a batch runner that discards state when it is
* done.
*/
public interface CleanupTimer<InputT> {
/**
* Return the current, local input watermark timestamp for this computation in the {@link
* TimeDomain#EVENT_TIME} time domain.
*/
Instant currentInputWatermarkTime();
/** Set the garbage collect time of the window to timer. */
void setForWindow(InputT value, BoundedWindow window);
/** Checks whether the given timer is a cleanup timer for the window. */
boolean isForWindow(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);
}
/** A cleaner to clean all states of the window. */
public interface StateCleaner<W extends BoundedWindow> {
void clearForWindow(W window);
}
/** A {@link StatefulDoFnRunner.CleanupTimer} implemented via {@link TimerInternals}. */
public static class TimeInternalsCleanupTimer<InputT>
implements StatefulDoFnRunner.CleanupTimer<InputT> {
public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
/**
* The amount of milliseconds by which to delay cleanup. We use this to ensure that state is
* still available when a user timer for {@code window.maxTimestamp()} fires.
*/
public static final long GC_DELAY_MS = 1;
private final TimerInternals timerInternals;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Coder<BoundedWindow> windowCoder;
public TimeInternalsCleanupTimer(
TimerInternals timerInternals, WindowingStrategy<?, ?> windowingStrategy) {
this.windowingStrategy = windowingStrategy;
WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
this.timerInternals = timerInternals;
}
@Override
public Instant currentInputWatermarkTime() {
return timerInternals.currentInputWatermarkTime();
}
@Override
public void setForWindow(InputT input, BoundedWindow window) {
Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
// make sure this fires after any window.maxTimestamp() timers
gcTime = gcTime.plus(GC_DELAY_MS);
timerInternals.setTimer(
StateNamespaces.window(windowCoder, window), GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
}
@Override
public boolean isForWindow(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
gcTime = gcTime.plus(GC_DELAY_MS);
return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
}
}
/** A {@link StatefulDoFnRunner.StateCleaner} implemented via {@link StateInternals}. */
public static class StateInternalsStateCleaner<W extends BoundedWindow>
implements StatefulDoFnRunner.StateCleaner<W> {
private final DoFn<?, ?> fn;
private final DoFnSignature signature;
private final StateInternals stateInternals;
private final Coder<W> windowCoder;
public StateInternalsStateCleaner(
DoFn<?, ?> fn, StateInternals stateInternals, Coder<W> windowCoder) {
this.fn = fn;
this.signature = DoFnSignatures.getSignature(fn.getClass());
this.stateInternals = stateInternals;
this.windowCoder = windowCoder;
}
@Override
public void clearForWindow(W window) {
for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
signature.stateDeclarations().entrySet()) {
try {
StateSpec<?> spec = (StateSpec<?>) entry.getValue().field().get(fn);
State state =
stateInternals.state(
StateNamespaces.window(windowCoder, window),
StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
state.clear();
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
}
}