blob: 181e33e5c788d50fe3816470b00130036b28e0c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closer;
import org.joda.time.Instant;
/** Execution context for the Dataflow worker. */
public abstract class DataflowExecutionContext<T extends DataflowStepContext> {
private final CounterFactory counterFactory;
private final MetricsContainerRegistry<?> metricsContainerRegistry;
private final ExecutionStateTracker executionStateTracker;
protected final DataflowExecutionStateRegistry executionStateRegistry;
// Desired limit on amount of data sinked. Cumulative
// across all the sinks, when there are more than one sinks.
private final long sinkByteLimit;
private long bytesSinked = 0;
public DataflowExecutionContext(
CounterFactory counterFactory,
MetricsContainerRegistry<?> metricsRegistry,
DataflowExecutionStateTracker executionStateTracker,
DataflowExecutionStateRegistry executionStateRegistry,
long sinkByteLimit) {
this.counterFactory = counterFactory;
this.metricsContainerRegistry = metricsRegistry;
this.executionStateTracker = executionStateTracker;
this.executionStateRegistry = executionStateRegistry;
this.sinkByteLimit = sinkByteLimit;
}
// Created step contexts, keyed by step name
private Map<String, T> cachedStepContexts = new LinkedHashMap<>();
/**
* Returns a {@link SideInputReader} based on {@link SideInputInfo} descriptors and {@link
* PCollectionView PCollectionViews}.
*
* <p>If side input source metadata is provided by the service in {@link SideInputInfo
* sideInputInfos}, we request a {@link SideInputReader} from the {@code executionContext} using
* that info. If no side input source metadata is provided but the DoFn expects side inputs, as a
* fallback, we request a {@link SideInputReader} based only on the expected views.
*
* <p>These cases are not disjoint: Whenever a {@link GroupAlsoByWindowFn} takes side inputs,
* {@code doFnInfo.getSideInputViews()} should be non-empty.
*
* <p>A note on the behavior of the Dataflow service: Today, the first case corresponds to batch
* mode, while the fallback corresponds to streaming mode.
*/
public SideInputReader getSideInputReader(
@Nullable Iterable<? extends SideInputInfo> sideInputInfos,
@Nullable Iterable<? extends PCollectionView<?>> views,
DataflowOperationContext operationContext)
throws Exception {
if (sideInputInfos != null && !Iterables.isEmpty(sideInputInfos)) {
return getSideInputReader(sideInputInfos, operationContext);
} else if (views != null && !Iterables.isEmpty(views)) {
return getSideInputReaderForViews(views);
} else {
return NullSideInputReader.empty();
}
}
public CounterFactory getCounterFactory() {
return counterFactory;
}
/** Returns a collection view of all of the {@link StepContext}s. */
public Collection<? extends T> getAllStepContexts() {
return Collections.unmodifiableCollection(cachedStepContexts.values());
}
/**
* A hint for currently executing step if the context prefers stopping processing more input since
* the sinks are 'full'. This is polled by readers to stop consuming more records, when they can.
* Currently the hint is set only by the sinks in streaming.
*/
boolean isSinkFullHintSet() {
return bytesSinked >= sinkByteLimit;
// In addition to hint from the sinks, we could consider other factors likes global memory
// pressure.
// TODO: We should have state bytes also to contribute to this hint, otherwise,
// the state size might grow unbounded.
}
/**
* Sets a flag to indicate that a sink has enough data written to it. This hint is read by
* upstream producers to stop producing if they can. Mainly used in streaming.
*/
void reportBytesSinked(long bytes) {
bytesSinked += bytes;
}
protected void clearSinkFullHint() {
// Cleared in Streaming when the context is reused for new work item.
bytesSinked = 0;
}
/**
* Returns a {@link SideInputReader} for all the side inputs described in the given {@link
* SideInputInfo} descriptors.
*/
protected abstract SideInputReader getSideInputReader(
Iterable<? extends SideInputInfo> sideInputInfos, DataflowOperationContext operationContext)
throws Exception;
protected abstract T createStepContext(DataflowOperationContext operationContext);
// TODO: Move StepContext creation to the OperationContext.
public T getStepContext(DataflowOperationContext operationContext) {
NameContext nameContext = operationContext.nameContext();
T context = cachedStepContexts.get(nameContext.systemName());
if (context == null) {
context = createStepContext(operationContext);
cachedStepContexts.put(nameContext.systemName(), context);
}
return context;
}
/**
* Returns a {@link SideInputReader} for all the provided views, where the execution context
* itself knows how to read data for the view.
*/
protected abstract SideInputReader getSideInputReaderForViews(
Iterable<? extends PCollectionView<?>> views) throws Exception;
/** Dataflow specific {@link StepContext}. */
public abstract static class DataflowStepContext implements StepContext {
private final NameContext nameContext;
public DataflowStepContext(NameContext nameContext) {
this.nameContext = nameContext;
}
public NameContext getNameContext() {
return nameContext;
}
/**
* Returns the next fired timer for this step.
*
* <p>The {@code windowCoder} is passed here as it is more convenient than doing so when the
* {@link DataflowStepContext} is created.
*/
@Nullable
public abstract <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCoder);
public abstract <W extends BoundedWindow> void setStateCleanupTimer(
String timerId, W window, Coder<W> windowCoder, Instant cleanupTime);
public abstract DataflowStepContext namespacedToUser();
}
/**
* Creates the context for an operation with the stage this execution context belongs to.
*
* @param nameContext the set of names identifying the operation
*/
public DataflowOperationContext createOperationContext(NameContext nameContext) {
MetricsContainer metricsContainer =
metricsContainerRegistry.getContainer(
checkNotNull(
nameContext.originalName(),
"All operations must have an original name, but %s doesn't.",
nameContext));
return new DataflowOperationContext(
counterFactory,
nameContext,
metricsContainer,
executionStateTracker,
executionStateRegistry);
}
protected MetricsContainerRegistry<?> getMetricsContainerRegistry() {
return metricsContainerRegistry;
}
protected DataflowExecutionStateRegistry getExecutionStateRegistry() {
return executionStateRegistry;
}
public ExecutionStateTracker getExecutionStateTracker() {
return executionStateTracker;
}
/**
* An extension of {@link ExecutionStateTracker} that also installs a {@link MetricsContainer}
* using the current state to locate the {@link MetricsEnvironment}.
*/
public static class DataflowExecutionStateTracker extends ExecutionStateTracker {
private final ElementExecutionTracker elementExecutionTracker;
private final DataflowOperationContext.DataflowExecutionState otherState;
private final ContextActivationObserverRegistry contextActivationObserverRegistry;
private final String workItemId;
public DataflowExecutionStateTracker(
ExecutionStateSampler sampler,
DataflowOperationContext.DataflowExecutionState otherState,
CounterFactory counterFactory,
PipelineOptions options,
String workItemId) {
super(sampler);
this.elementExecutionTracker =
DataflowElementExecutionTracker.create(counterFactory, options);
this.otherState = otherState;
this.workItemId = workItemId;
this.contextActivationObserverRegistry = ContextActivationObserverRegistry.createDefault();
}
@Override
public Closeable activate() {
Closer closer = Closer.create();
try {
closer.register(super.activate());
for (ContextActivationObserver p :
contextActivationObserverRegistry.getContextActivationObservers()) {
closer.register(p.activate(this));
}
closer.register(enterState(otherState));
return closer;
} catch (Exception e) {
try {
closer.close();
} catch (IOException suppressed) {
// Shouldn't happen -- none of the things being closed actually throw.
e.addSuppressed(suppressed);
}
throw e;
}
}
@Override
protected void takeSample(long millisSinceLastSample) {
elementExecutionTracker.takeSample(millisSinceLastSample);
super.takeSample(millisSinceLastSample);
}
@Override
public Closeable enterState(ExecutionState newState) {
Closeable baseCloseable = super.enterState(newState);
final boolean isDataflowProcessElementState =
newState.isProcessElementState && newState instanceof DataflowExecutionState;
if (isDataflowProcessElementState) {
elementExecutionTracker.enter(((DataflowExecutionState) newState).getStepName());
}
return () -> {
if (isDataflowProcessElementState) {
elementExecutionTracker.exit();
}
baseCloseable.close();
};
}
public String getWorkItemId() {
return this.workItemId;
}
}
}