blob: 19670f47fc8e115c3f6ce9a7e4bbc42b07533f49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WeightedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
/** {@link DataflowExecutionContext} for use in batch mode. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class BatchModeExecutionContext
extends DataflowExecutionContext<BatchModeExecutionContext.StepContext> {
protected final Cache<?, WeightedValue<?>> dataCache;
protected final Cache<?, ?> logicalReferenceCache;
protected final PipelineOptions options;
protected final ReaderFactory readerFactory;
private Object key;
private final MetricsContainerRegistry<MetricsContainerImpl> containerRegistry;
// TODO(BEAM-7863): Move throttle time Metric to a dedicated namespace.
protected static final String DATASTORE_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$DatastoreWriterFn";
protected static final String HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer$LoggingHttpBackOffHandler";
protected static final String BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl";
protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
protected static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs";
private BatchModeExecutionContext(
CounterFactory counterFactory,
Cache<?, WeightedValue<?>> dataCache,
Cache<?, ?> logicalReferenceCache,
ReaderFactory readerFactory,
PipelineOptions options,
DataflowExecutionStateTracker executionStateTracker,
DataflowExecutionStateRegistry executionStateRegistry) {
super(
counterFactory,
createMetricsContainerRegistry(),
executionStateTracker,
executionStateRegistry,
Long.MAX_VALUE);
this.logicalReferenceCache = logicalReferenceCache;
this.readerFactory = readerFactory;
this.options = options;
this.dataCache = dataCache;
this.containerRegistry =
(MetricsContainerRegistry<MetricsContainerImpl>) getMetricsContainerRegistry();
}
private static MetricsContainerRegistry<MetricsContainerImpl> createMetricsContainerRegistry() {
return new MetricsContainerRegistry<MetricsContainerImpl>() {
@Override
protected MetricsContainerImpl createContainer(String stepName) {
return new MetricsContainerImpl(stepName);
}
};
}
public static BatchModeExecutionContext forTesting(
PipelineOptions options, CounterFactory counterFactory, String stageName) {
BatchModeExecutionStateRegistry stateRegistry = new BatchModeExecutionStateRegistry();
return new BatchModeExecutionContext(
counterFactory,
CacheBuilder.newBuilder()
.maximumWeight(1_000_000) // weights are in bytes
.weigher(Weighers.fixedWeightKeys(8))
.softValues()
.recordStats()
.<Object, WeightedValue<?>>build(),
CacheBuilder.newBuilder().weakValues().build(),
ReaderRegistry.defaultRegistry(),
options,
new DataflowExecutionStateTracker(
ExecutionStateSampler.newForTest(),
stateRegistry.getState(
NameContext.forStage(stageName),
"other",
null,
ScopedProfiler.INSTANCE.emptyScope()),
counterFactory,
options,
"test-work-item-id"),
stateRegistry);
}
public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) {
CounterFactory counterFactory = new CounterFactory();
return forTesting(options, counterFactory, stageName);
}
/**
* A version of {@link DataflowOperationContext.DataflowExecutionState} supporting per-bundle MSEC
* counters.
*/
@VisibleForTesting
static class BatchModeExecutionState extends DataflowOperationContext.DataflowExecutionState {
// Must be volatile so that totalMillisInState modifications are atomic. This field is only
// written by the sampler thread in takeSample, and read in a separate thread within
// extractUpdate.
private volatile long totalMillisInState = 0;
// This thread is only read and written within extractUpdate, which will be called from either
// the progress reporting thread or the execution thread when producing the final update.
// The value is not used as part of producing the final update, so the fact this is used in
// multiple threads does not lead to incorrect results.
private long lastReportedMillis = 0;
public BatchModeExecutionState(
NameContext nameContext,
String stateName,
@Nullable String requestingStepName,
@Nullable Integer inputIndex,
@Nullable MetricsContainer metricsContainer,
ProfileScope profileScope) {
super(nameContext, stateName, requestingStepName, inputIndex, metricsContainer, profileScope);
}
/**
* Take sample is only called by the ExecutionStateSampler thread and it is the only place that
* writes to totalMillisInState. Thus, we don't need to synchronize these writes, but we just
* need to make sure that the value seen by readers (which are in the reporting thread) is
* consistent. This is ensured since totalMillisInState is volatile, and thus is atomic.
*/
@SuppressWarnings("NonAtomicVolatileUpdate") // Single writer.
@Override
public void takeSample(long millisSinceLastSample) {
totalMillisInState += millisSinceLastSample;
}
/**
* Extract updates in the from of a {@link CounterUpdate}.
*
* <p>Non-final updates are extracted by the progress reporting thread (single reader, different
* from the sampler thread calling takeSample).
*
* <p>Final updates are extracted by the execution thread, and will be reported after all
* processing has completed and the writer thread has been shutdown.
*/
@Override
public @Nullable CounterUpdate extractUpdate(boolean isFinalUpdate) {
long millisToReport = totalMillisInState;
if (millisToReport == lastReportedMillis && !isFinalUpdate) {
return null;
}
lastReportedMillis = millisToReport;
return createUpdate(true, totalMillisInState);
}
}
/**
* {@link DataflowExecutionStateRegistry} that creates {@link BatchModeExecutionState} instances.
*/
@VisibleForTesting
public static class BatchModeExecutionStateRegistry extends DataflowExecutionStateRegistry {
@Override
protected DataflowOperationContext.DataflowExecutionState createState(
NameContext nameContext,
String stateName,
@Nullable String requestingStepName,
@Nullable Integer inputIndex,
MetricsContainer container,
ProfileScope profileScope) {
return new BatchModeExecutionState(
nameContext, stateName, requestingStepName, inputIndex, container, profileScope);
}
}
public static BatchModeExecutionContext create(
CounterFactory counterFactory,
Cache<?, WeightedValue<?>> dataCache,
Cache<?, ?> logicalReferenceCache,
ReaderFactory readerFactory,
DataflowPipelineOptions options,
String stageName,
String workItemId) {
BatchModeExecutionStateRegistry executionStateRegistry = new BatchModeExecutionStateRegistry();
return new BatchModeExecutionContext(
counterFactory,
dataCache,
logicalReferenceCache,
readerFactory,
options,
new DataflowExecutionStateTracker(
ExecutionStateSampler.instance(),
executionStateRegistry.getState(
NameContext.forStage(stageName),
"other",
null,
ScopedProfiler.INSTANCE.emptyScope()),
counterFactory,
options,
workItemId),
executionStateRegistry);
}
/** Create a new {@link StepContext}. */
@Override
protected StepContext createStepContext(DataflowOperationContext operationContext) {
return new StepContext(operationContext);
}
/** Sets the key of the work currently being processed. */
public void setKey(Object key) {
if (!Objects.equals(key, this.key)) {
switchStateKey(key);
}
this.key = key;
}
/** @param newKey the key being switched to */
protected void switchStateKey(Object newKey) {
for (StepContext stepContext : getAllStepContexts()) {
stepContext.setKey(newKey);
}
}
/**
* Returns the key of the work currently being processed.
*
* <p>If there is not a currently defined key, returns null.
*/
public @Nullable Object getKey() {
return key;
}
@Override
protected SideInputReader getSideInputReader(
Iterable<? extends SideInputInfo> sideInputInfos, DataflowOperationContext operationContext)
throws Exception {
return new LazilyInitializedSideInputReader(
sideInputInfos,
() -> {
try {
return IsmSideInputReader.of(
sideInputInfos,
options,
BatchModeExecutionContext.this,
readerFactory,
operationContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Override
protected SideInputReader getSideInputReaderForViews(
Iterable<? extends PCollectionView<?>> sideInputViews) {
throw new UnsupportedOperationException(
"Cannot call getSideInputReaderForViews for batch DataflowWorker: "
+ "the MapTask specification should have had SideInputInfo descriptors "
+ "for each side input, and a SideInputReader provided via getSideInputReader");
}
// TODO: Expose a keyed sub-cache which allows one to store all cached values in their
// own namespace.
public <K, V> Cache<K, V> getDataCache() {
@SuppressWarnings({"unchecked", "rawtypes"})
Cache<K, V> rval = (Cache) dataCache;
return rval;
}
public <K, V> Cache<K, V> getLogicalReferenceCache() {
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"unchecked"
})
Cache<K, V> rval = (Cache) logicalReferenceCache;
return rval;
}
/** {@link DataflowStepContext} used in batch mode. */
public class StepContext extends DataflowExecutionContext.DataflowStepContext {
// State internals only for use by the system, lazily instantiated
private @Nullable InMemoryStateInternals<Object> systemStateInternals;
// State internals scoped to the user, lazily instantiated
private @Nullable InMemoryStateInternals<Object> userStateInternals;
// Timer internals scoped to the user, lazily instantiated
private @Nullable InMemoryTimerInternals userTimerInternals;
// Timer internals added for OnWindowExpiration functionality
private @Nullable InMemoryTimerInternals systemTimerInternals;
private InMemoryStateInternals<Object> getUserStateInternals() {
if (userStateInternals == null) {
userStateInternals = InMemoryStateInternals.forKey(getKey());
}
return userStateInternals;
}
private InMemoryTimerInternals getUserTimerInternals() {
if (userTimerInternals == null) {
userTimerInternals = new InMemoryTimerInternals();
}
return userTimerInternals;
}
private StepContext(DataflowOperationContext operationContext) {
super(operationContext.nameContext());
systemStateInternals = null;
userStateInternals = null;
userTimerInternals = null;
systemTimerInternals = null;
}
public void setKey(Object newKey) {
// When the key changes, wipe out existing state and timers for later
// lazy instantiation.
// In batch mode, a specific key is always processed contiguously
// because the state is either used after a GroupByKeyOnly where
// each key only occurs once, or after some ParDo's that preserved
// the key.
systemStateInternals = null;
userStateInternals = null;
userTimerInternals = null;
systemTimerInternals = null;
}
@Override
public StateInternals stateInternals() {
if (systemStateInternals == null) {
systemStateInternals = InMemoryStateInternals.forKey(getKey());
}
return systemStateInternals;
}
@Override
public InMemoryTimerInternals timerInternals() {
if (systemTimerInternals == null) {
systemTimerInternals = new InMemoryTimerInternals();
}
return systemTimerInternals;
}
@Override
public @Nullable <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCoder) {
try {
timerInternals().advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
} catch (Exception e) {
throw new IllegalStateException("Exception thrown advancing watermark", e);
}
return timerInternals().removeNextEventTimer();
}
@Override
public <W extends BoundedWindow> void setStateCleanupTimer(
String timerId,
W window,
Coder<W> windowCoder,
Instant cleanupTime,
Instant cleanupOutputTimestamp) {
timerInternals()
.setTimer(
StateNamespaces.window(windowCoder, window),
timerId,
"",
cleanupTime,
cleanupOutputTimestamp,
TimeDomain.EVENT_TIME);
}
@Override
public DataflowStepContext namespacedToUser() {
return new UserStepContext(this);
}
}
/**
* A specialized {@link StepContext} that uses provided {@link StateInternals} and {@link
* TimerInternals} for user state and timers.
*/
private static class UserStepContext extends DataflowStepContext {
private final BatchModeExecutionContext.StepContext wrapped;
public UserStepContext(BatchModeExecutionContext.StepContext wrapped) {
super(wrapped.getNameContext());
this.wrapped = wrapped;
}
@Override
public StateInternals stateInternals() {
return wrapped.getUserStateInternals();
}
@Override
public TimerInternals timerInternals() {
return wrapped.getUserTimerInternals();
}
@Override
public <W extends BoundedWindow> @Nullable TimerData getNextFiredTimer(Coder<W> windowCoder) {
// Only event time timers fire, as processing time timers are reserved until after the
// bundle is complete, so they are all delivered droppably late
//
// Note also that timers hold the _output_ watermark, but the input watermark is
// advanced to infinity when timers begin to process
try {
wrapped.getUserTimerInternals().advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
} catch (Exception e) {
throw new IllegalStateException("Exception thrown advancing watermark", e);
}
return wrapped.getUserTimerInternals().removeNextEventTimer();
}
@Override
public <W extends BoundedWindow> void setStateCleanupTimer(
String timerId,
W window,
Coder<W> windowCoder,
Instant cleanupTime,
Instant cleanupOutputTimestamp) {
throw new UnsupportedOperationException(
String.format(
"setStateCleanupTimer should not be called on %s, only on a system %s",
getClass().getSimpleName(),
BatchModeExecutionContext.StepContext.class.getSimpleName()));
}
@Override
public DataflowStepContext namespacedToUser() {
return this;
}
}
/**
* Returns {@link CounterUpdate} protos representing the latest cumulative values of all
* user-defined metrics reported within this execution context.
*/
public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
return containerRegistry
.getContainers()
.transformAndConcat(
container -> {
MetricUpdates updates;
if (isFinalUpdate) {
// getCumulative returns cumulative values for all metrics.
updates = container.getCumulative();
} else {
// getUpdates returns cumulative values only for metrics which have changes
// since the last time updates were committed.
updates = container.getUpdates();
}
return Iterables.concat(
FluentIterable.from(updates.counterUpdates())
.transform(
update ->
MetricsToCounterUpdateConverter.fromCounter(
update.getKey(), true, update.getUpdate())),
FluentIterable.from(updates.distributionUpdates())
.transform(
update ->
MetricsToCounterUpdateConverter.fromDistribution(
update.getKey(), true, update.getUpdate())));
});
}
public void commitMetricUpdates() {
for (MetricsContainerImpl container : containerRegistry.getContainers()) {
container.commitUpdates();
}
}
public Iterable<CounterUpdate> extractMsecCounters(boolean isFinalUpdate) {
return executionStateRegistry.extractUpdates(isFinalUpdate);
}
public Long extractThrottleTime() {
long totalThrottleMsecs = 0L;
for (MetricsContainerImpl container : containerRegistry.getContainers()) {
// TODO(BEAM-7863): Update throttling counters to use generic throttling-msecs metric.
CounterCell dataStoreThrottlingTime =
container.tryGetCounter(
MetricName.named(DATASTORE_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
if (dataStoreThrottlingTime != null) {
totalThrottleMsecs += dataStoreThrottlingTime.getCumulative();
}
CounterCell httpClientApiThrottlingTime =
container.tryGetCounter(
MetricName.named(
HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
if (httpClientApiThrottlingTime != null) {
totalThrottleMsecs += httpClientApiThrottlingTime.getCumulative();
}
CounterCell bigqueryStreamingInsertThrottleTime =
container.tryGetCounter(
MetricName.named(
BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
if (bigqueryStreamingInsertThrottleTime != null) {
totalThrottleMsecs += bigqueryStreamingInsertThrottleTime.getCumulative();
}
CounterCell bigqueryReadThrottleTime =
container.tryGetCounter(
MetricName.named(BIGQUERY_READ_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
if (bigqueryReadThrottleTime != null) {
totalThrottleMsecs += bigqueryReadThrottleTime.getCumulative();
}
CounterCell throttlingMsecs =
container.tryGetCounter(DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME);
if (throttlingMsecs != null) {
totalThrottleMsecs += throttlingMsecs.getCumulative();
}
}
return TimeUnit.MILLISECONDS.toSeconds(totalThrottleMsecs);
}
}