blob: 6951e8bd80a09be2af5a01c6f89a808ef9a0d02a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.io.Closeable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.DoFnInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A base class providing simple set up, processing, and tear down for a wrapped {@link
* GroupAlsoByWindowFn}.
*
* <p>Subclasses override just a method to provide a {@link DoFnInfo} for the wrapped {@link
* GroupAlsoByWindowFn}.
*/
public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {
// TODO: Remove once Distributions has shipped.
@VisibleForTesting
static final String OUTPUTS_PER_ELEMENT_EXPERIMENT = "outputs_per_element_counter";
private static final String COUNTER_NAME = "per-element-output-count";
private static final Logger LOG = LoggerFactory.getLogger(SimpleParDoFn.class);
protected final PipelineOptions options;
private final DoFnInstanceManager doFnInstanceManager;
private final SideInputReader sideInputReader;
private final DataflowOperationContext operationContext;
private final TupleTag<OutputT> mainOutputTag;
private final Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices;
private final List<TupleTag<?>> sideOutputTags;
private final DataflowExecutionContext.DataflowStepContext stepContext;
private final DataflowExecutionContext.DataflowStepContext userStepContext;
private final CounterFactory counterFactory;
private final DoFnRunnerFactory runnerFactory;
private final boolean hasStreamingSideInput;
private final OutputsPerElementTracker outputsPerElementTracker;
private final DoFnSchemaInformation doFnSchemaInformation;
// Various DoFn helpers, null between bundles
@Nullable private DoFnRunner<InputT, OutputT> fnRunner;
@Nullable DoFnInfo<InputT, OutputT> fnInfo;
@Nullable private Receiver[] receivers;
// This may additionally be null if it is not a real DoFn but an OldDoFn or
// GroupAlsoByWindowViaWindowSetDoFn
@Nullable private DoFnSignature fnSignature;
/** Creates a {@link SimpleParDoFn} using basic information about the step being executed. */
SimpleParDoFn(
PipelineOptions options,
DoFnInstanceManager doFnInstanceManager,
SideInputReader sideInputReader,
TupleTag<OutputT> mainOutputTag,
Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
DataflowExecutionContext.DataflowStepContext stepContext,
DataflowOperationContext operationContext,
DoFnSchemaInformation doFnSchemaInformation,
DoFnRunnerFactory runnerFactory) {
this.options = options;
this.doFnInstanceManager = doFnInstanceManager;
// We vend a freshly deserialized version for each run
this.sideInputReader = sideInputReader;
this.operationContext = operationContext;
checkArgument(!outputTupleTagsToReceiverIndices.isEmpty(), "expected at least one output");
this.mainOutputTag = mainOutputTag;
this.outputTupleTagsToReceiverIndices = outputTupleTagsToReceiverIndices;
ImmutableList.Builder<TupleTag<?>> sideOutputTagsBuilder = ImmutableList.builder();
for (TupleTag<?> tag : outputTupleTagsToReceiverIndices.keySet()) {
if (!mainOutputTag.equals(tag)) {
sideOutputTagsBuilder.add(tag);
}
}
this.sideOutputTags = sideOutputTagsBuilder.build();
this.stepContext = stepContext;
// StepContext provides a TimerInternals and StateInternals for use by the system - this class.
// For the user, we request a user-scoped StepContext to provide a user-scoped
// StateInternals and TimerInternals.
this.userStepContext = stepContext.namespacedToUser();
this.counterFactory = operationContext.counterFactory();
this.runnerFactory = runnerFactory;
this.hasStreamingSideInput =
options.as(StreamingOptions.class).isStreaming() && !sideInputReader.isEmpty();
this.outputsPerElementTracker = createOutputsPerElementTracker();
this.doFnSchemaInformation = doFnSchemaInformation;
}
private OutputsPerElementTracker createOutputsPerElementTracker() {
// TODO: Remove once Distributions has shipped.
if (!hasExperiment(OUTPUTS_PER_ELEMENT_EXPERIMENT)) {
return NoopOutputsPerElementTracker.INSTANCE;
}
// TODO: Remove log statement when functionality is enabled by default.
LOG.info("{} counter enabled.", COUNTER_NAME);
return new OutputsPerElementTrackerImpl();
}
private boolean hasExperiment(String experiment) {
List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
return experiments != null && experiments.contains(experiment);
}
/** Simple state tracker to calculate PerElementOutputCount counter. */
private interface OutputsPerElementTracker {
void onOutput();
void onProcessElement();
void onProcessElementSuccess();
}
private class OutputsPerElementTrackerImpl implements OutputsPerElementTracker {
private long outputsPerElement;
private final Counter<Long, CounterFactory.CounterDistribution> counter;
public OutputsPerElementTrackerImpl() {
this.counter =
counterFactory.distribution(
CounterName.named(COUNTER_NAME).withOriginalName(stepContext.getNameContext()));
}
@Override
public void onProcessElement() {
reset();
}
@Override
public void onOutput() {
outputsPerElement++;
}
@Override
public void onProcessElementSuccess() {
counter.addValue(outputsPerElement);
reset();
}
private void reset() {
outputsPerElement = 0L;
}
}
/** No-op {@link OutputsPerElementTracker} implementation used when the counter is disabled. */
private static class NoopOutputsPerElementTracker implements OutputsPerElementTracker {
private NoopOutputsPerElementTracker() {}
public static final OutputsPerElementTracker INSTANCE = new NoopOutputsPerElementTracker();
@Override
public void onOutput() {}
@Override
public void onProcessElement() {}
@Override
public void onProcessElementSuccess() {}
}
@Override
public void startBundle(Receiver... receivers) throws Exception {
checkArgument(
receivers.length == outputTupleTagsToReceiverIndices.size(),
"unexpected number of receivers for DoFn");
this.receivers = receivers;
if (hasStreamingSideInput) {
// There is non-trivial setup that needs to be performed for watermark propagation
// even on empty bundles.
reallyStartBundle();
}
}
private void reallyStartBundle() throws Exception {
checkState(fnRunner == null, "bundle already started (or not properly finished)");
OutputManager outputManager =
new OutputManager() {
final Map<TupleTag<?>, OutputReceiver> undeclaredOutputs = new HashMap<>();
@Nullable
private Receiver getReceiverOrNull(TupleTag<?> tag) {
Integer receiverIndex = outputTupleTagsToReceiverIndices.get(tag);
if (receiverIndex != null) {
return receivers[receiverIndex];
} else {
return undeclaredOutputs.get(tag);
}
}
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
outputsPerElementTracker.onOutput();
Receiver receiver = getReceiverOrNull(tag);
if (receiver == null) {
// A new undeclared output.
// TODO: plumb through the operationName, so that we can
// name implicit outputs after it.
String outputName = "implicit-" + tag.getId();
// TODO: plumb through the counter prefix, so we can
// make it available to the OutputReceiver class in case
// it wants to use it in naming output counterFactory. (It
// doesn't today.)
OutputReceiver undeclaredReceiver = new OutputReceiver();
ElementCounter outputCounter =
new DataflowOutputCounter(
outputName, counterFactory, stepContext.getNameContext());
undeclaredReceiver.addOutputCounter(outputCounter);
undeclaredOutputs.put(tag, undeclaredReceiver);
receiver = undeclaredReceiver;
}
try {
receiver.process(output);
} catch (RuntimeException | Error e) {
// Rethrow unchecked exceptions as-is to avoid excessive nesting
// via a chain of DoFn's.
throw e;
} catch (Exception e) {
// This should never happen in practice with DoFn's, but can happen
// with other Receivers.
throw new RuntimeException(e);
}
}
};
fnInfo = (DoFnInfo) doFnInstanceManager.get();
fnSignature = DoFnSignatures.getSignature(fnInfo.getDoFn().getClass());
fnRunner =
runnerFactory.createRunner(
fnInfo.getDoFn(),
options,
mainOutputTag,
sideOutputTags,
fnInfo.getSideInputViews(),
sideInputReader,
fnInfo.getInputCoder(),
fnInfo.getOutputCoders(),
fnInfo.getWindowingStrategy(),
stepContext,
userStepContext,
outputManager,
doFnSchemaInformation);
fnRunner.startBundle();
}
@Override
@SuppressWarnings("unchecked")
public void processElement(Object untypedElem) throws Exception {
if (fnRunner == null) {
// If we need to run reallyStartBundle in here, we need to make sure to switch the state
// sampler into the start state.
try (Closeable start = operationContext.enterStart()) {
reallyStartBundle();
}
}
WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
registerStateCleanup(
(WindowingStrategy<?, BoundedWindow>) getDoFnInfo().getWindowingStrategy(),
(Collection<BoundedWindow>) elem.getWindows());
}
outputsPerElementTracker.onProcessElement();
fnRunner.processElement(elem);
outputsPerElementTracker.onProcessElementSuccess();
}
@Override
public void processTimers() throws Exception {
// Note: We need to get windowCoder to decode the timers. If we haven't already deserialized
// the fnInfo, we peek at a new instance to retrieve that. If this extra deserialization becomes
// excessively costly, we could either (1) have the DoFnInstanceManager remember the associated
// windowCoder (allowing us to get it without a DoFnInfo instance) or (2) check whether timers
// exist without actually decoding them.
Coder<BoundedWindow> windowCoder =
(Coder<BoundedWindow>)
(fnInfo != null ? fnInfo : doFnInstanceManager.peek())
.getWindowingStrategy()
.getWindowFn()
.windowCoder();
processTimers(TimerType.USER, userStepContext, windowCoder);
processTimers(TimerType.SYSTEM, stepContext, windowCoder);
}
private void processUserTimer(TimerData timer) throws Exception {
if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())) {
BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow();
fnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
}
}
private void processSystemTimer(TimerData timer) throws Exception {
// Timer owned by this class, for cleaning up state in expired windows
if (timer.getTimerId().equals(CLEANUP_TIMER_ID)) {
checkState(
timer.getDomain().equals(TimeDomain.EVENT_TIME),
"%s received cleanup timer with domain not EVENT_TIME: %s",
this,
timer);
checkState(
timer.getNamespace() instanceof WindowNamespace,
"%s received cleanup timer not for a %s: %s",
this,
WindowNamespace.class.getSimpleName(),
timer);
BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow();
Instant targetTime = earliestAllowableCleanupTime(window, fnInfo.getWindowingStrategy());
checkState(
!targetTime.isAfter(timer.getTimestamp()),
"%s received state cleanup timer for window %s "
+ " that is before the appropriate cleanup time %s",
this,
window,
targetTime);
// This is for a timer for a window that is expired, so clean it up.
for (StateDeclaration stateDecl : fnSignature.stateDeclarations().values()) {
StateTag<?> tag;
try {
tag =
StateTags.tagForSpec(
stateDecl.id(), (StateSpec) stateDecl.field().get(fnInfo.getDoFn()));
} catch (IllegalAccessException e) {
throw new RuntimeException(
String.format(
"Error accessing %s for %s",
StateSpec.class.getName(), fnInfo.getDoFn().getClass().getName()),
e);
}
StateInternals stateInternals = userStepContext.stateInternals();
org.apache.beam.sdk.state.State state = stateInternals.state(timer.getNamespace(), tag);
state.clear();
}
}
}
@Override
public void finishBundle() throws Exception {
if (fnRunner != null) {
fnRunner.finishBundle();
doFnInstanceManager.complete(fnInfo);
fnRunner = null;
fnInfo = null;
fnSignature = null;
}
}
@Override
public void abort() throws Exception {
doFnInstanceManager.abort(fnInfo);
fnRunner = null;
fnInfo = null;
}
@VisibleForTesting static final String CLEANUP_TIMER_ID = "cleanup-timer";
private enum TimerType {
USER {
@Override
public void processTimer(SimpleParDoFn doFn, TimerData timer) throws Exception {
doFn.processUserTimer(timer);
}
},
SYSTEM {
@Override
public void processTimer(SimpleParDoFn doFn, TimerData timer) throws Exception {
doFn.processSystemTimer(timer);
}
};
public abstract void processTimer(SimpleParDoFn doFn, TimerData timer) throws Exception;
};
private void processTimers(
TimerType mode,
DataflowExecutionContext.DataflowStepContext context,
Coder<BoundedWindow> windowCoder)
throws Exception {
TimerData timer = context.getNextFiredTimer(windowCoder);
if (timer != null && fnRunner == null) {
// If we need to run reallyStartBundle in here, we need to make sure to switch the state
// sampler into the start state.
try (Closeable start = operationContext.enterStart()) {
reallyStartBundle();
}
}
while (timer != null) {
mode.processTimer(this, timer);
timer = context.getNextFiredTimer(windowCoder);
}
}
private <W extends BoundedWindow> void registerStateCleanup(
WindowingStrategy<?, W> windowingStrategy, Collection<W> windowsToCleanup) {
Coder<W> windowCoder = windowingStrategy.getWindowFn().windowCoder();
for (W window : windowsToCleanup) {
// The stepContext is the thing that know if it is batch or streaming, hence
// whether state needs to be cleaned up or will simply be discarded so the
// timer can be ignored
stepContext.setStateCleanupTimer(
CLEANUP_TIMER_ID,
window,
windowCoder,
earliestAllowableCleanupTime(window, windowingStrategy));
}
}
private Instant earliestAllowableCleanupTime(
BoundedWindow window, WindowingStrategy windowingStrategy) {
return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).plus(1L);
}
/**
* Returns the {@link DoFnInfo} currently being used by this {@link SimpleParDoFn}.
*
* <p>May be null if no element has been processed yet, or if the {@link SimpleParDoFn} has
* finished.
*/
@Nullable
@VisibleForTesting
DoFnInfo<?, ?> getDoFnInfo() {
return fnInfo;
}
}