blob: f88da9935ae61e5ec9530396ad8207bf5243430a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.fn.control;
import com.google.api.services.dataflow.model.CounterUpdate;
import io.opencensus.common.Scope;
import io.opencensus.trace.SpanBuilder;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metrics;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.runners.core.metrics.MetricsTranslation;
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitRequest;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link WorkExecutor} that processes a list of {@link Operation}s.
*
* <p>Note that this executor is meant to be used with the Fn API. Several of the methods to request
* splitting, checkpointing, work progress are unimplemented.
*/
public class BeamFnMapTaskExecutor extends DataflowMapTaskExecutor {
private static final Logger LOG = LoggerFactory.getLogger(BeamFnMapTaskExecutor.class);
private final ProgressTracker progressTracker;
private BeamFnMapTaskExecutor(
List<Operation> operations,
CounterSet counters,
ExecutionStateTracker executionStateTracker) {
super(operations, counters, executionStateTracker);
this.progressTracker = createProgressTracker();
LOG.info("Creating BeamFnMapTaskExecutor");
}
/**
* Creates a new MapTaskExecutor.
*
* @param operations the operations of the map task, in order of execution
*/
public static BeamFnMapTaskExecutor forOperations(
List<Operation> operations, ExecutionStateTracker executionStateTracker) {
return new BeamFnMapTaskExecutor(operations, new CounterSet(), executionStateTracker);
}
/**
* Creates a new MapTaskExecutor with a shared set of counters with its creator.
*
* @param operations the operations of the map task, in order of execution
* @param counters a set of system counters associated with operations, which may get extended
* during execution
*/
public static BeamFnMapTaskExecutor withSharedCounterSet(
List<Operation> operations,
CounterSet counters,
ExecutionStateTracker executionStateTracker) {
return new BeamFnMapTaskExecutor(operations, counters, executionStateTracker);
}
@Override
public void execute() throws Exception {
Tracer tracer = Tracing.getTracer();
SpanBuilder builder = tracer.spanBuilder("MapTaskExecutor.Span").setRecordEvents(true);
// Start the progress tracker before execution (which blocks until execution is finished).
try (Scope unused = builder.startScopedSpan();
AutoCloseable unused2 = progressTrackerCloseable(progressTracker)) {
tracer.getCurrentSpan().addAnnotation("About to execute");
super.execute();
tracer.getCurrentSpan().addAnnotation("Done with execute");
}
}
private AutoCloseable progressTrackerCloseable(ProgressTracker progressTracker) {
progressTracker.start();
return () -> progressTracker.stop();
}
@Override
@Nullable
public Progress getWorkerProgress() throws Exception {
return progressTracker.getWorkerProgress();
}
/**
* {@inheritDoc}
*
* @return User-defined Beam metrics reported over the Fn API.
*/
@Override
public Iterable<CounterUpdate> extractMetricUpdates() {
List<CounterUpdate> result = progressTracker.extractCounterUpdates();
if ((result != null) && (result.size() > 0)) {
return result;
}
// todo(BEAM-6189): Remove this fallback once Metrics is deprecated from SDKs.
MetricUpdates updates = progressTracker.extractMetricUpdates();
Iterable<CounterUpdate> deprecatedMetrics =
Iterables.concat(
StreamSupport.stream(updates.counterUpdates().spliterator(), false)
.map(
update ->
MetricsToCounterUpdateConverter.fromCounter(
update.getKey(), true, update.getUpdate()))
.collect(Collectors.toList()),
StreamSupport.stream(updates.distributionUpdates().spliterator(), false)
.map(
update ->
MetricsToCounterUpdateConverter.fromDistribution(
update.getKey(), true, update.getUpdate()))
.collect(Collectors.toList()));
return deprecatedMetrics;
}
@Override
@Nullable
public DynamicSplitResult requestCheckpoint() throws Exception {
return progressTracker.requestCheckpoint();
}
@Override
@Nullable
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest) throws Exception {
return progressTracker.requestDynamicSplit(splitRequest);
}
@Override
public ReadOperation getReadOperation() throws Exception {
// TODO: Remove streaming Dataflow's reliance on access to the "ReadOperation".
for (Operation operation : operations) {
if (operation instanceof ReadOperation) {
return (ReadOperation) operation;
}
}
throw new IllegalStateException(String.format("ReadOperation not found in %s", operations));
}
private interface ProgressTracker {
@Nullable
public Progress getWorkerProgress() throws Exception;
/**
* Returns an metric updates accumulated since the last call to {@link #extractMetricUpdates()}.
*/
@Deprecated
public MetricUpdates extractMetricUpdates();
public List<CounterUpdate> extractCounterUpdates();
@Nullable
public DynamicSplitResult requestCheckpoint() throws Exception;
@Nullable
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest)
throws Exception;
public default void start() {}
public default void stop() {}
}
private static class NullProgressTracker implements ProgressTracker {
@Nullable
@Override
public Progress getWorkerProgress() {
return null;
}
@Override
public MetricUpdates extractMetricUpdates() {
return MetricUpdates.EMPTY;
}
@Override
public List<CounterUpdate> extractCounterUpdates() {
return Collections.emptyList();
}
@Nullable
@Override
public DynamicSplitResult requestCheckpoint() {
return null;
}
@Nullable
@Override
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest) {
return null;
}
}
private static class ReadOperationProgressTracker implements ProgressTracker {
final ReadOperation readOperation;
public ReadOperationProgressTracker(ReadOperation readOperation) {
this.readOperation = readOperation;
}
@Nullable
@Override
public Progress getWorkerProgress() throws Exception {
return readOperation.getProgress();
}
@Override
public MetricUpdates extractMetricUpdates() {
return MetricUpdates.EMPTY;
}
@Override
public List<CounterUpdate> extractCounterUpdates() {
return Collections.emptyList();
}
@Nullable
@Override
public DynamicSplitResult requestCheckpoint() throws Exception {
return readOperation.requestCheckpoint();
}
@Nullable
@Override
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest)
throws Exception {
return readOperation.requestDynamicSplit(splitRequest);
}
}
@VisibleForTesting
/*package*/ static class SingularProcessBundleProgressTracker implements ProgressTracker {
private static final int MAX_DATA_POINTS = 1000;
private final ReadOperation readOperation;
private final RemoteGrpcPortWriteOperation grpcWriteOperation;
private final RegisterAndProcessBundleOperation bundleProcessOperation;
private static final long progressUpdatePeriodMs =
ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS;
private int progressErrors;
private final Interpolator<Progress> progressInterpolator;
private final AtomicReference<Progress> latestProgress = new AtomicReference<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> nextProgressFuture;
private final Consumer<Integer> grpcWriteOperationElementsProcessed;
private List<CounterUpdate> counterUpdates = new ArrayList<>();
private final Map<MetricKey, MetricUpdate<Long>> deprecatedCounterUpdates;
private final Map<MetricKey, MetricUpdate<DistributionData>> deprecatedDistributionUpdates;
private final Map<MetricKey, MetricUpdate<GaugeData>> deprecatedGaugeUpdates;
public SingularProcessBundleProgressTracker(
ReadOperation readOperation,
RemoteGrpcPortWriteOperation grpcWriteOperation,
RegisterAndProcessBundleOperation bundleProcessOperation) {
this.readOperation = readOperation;
this.grpcWriteOperation = grpcWriteOperation;
this.bundleProcessOperation = bundleProcessOperation;
this.grpcWriteOperationElementsProcessed = grpcWriteOperation.processedElementsConsumer();
this.progressInterpolator =
new Interpolator<Progress>(MAX_DATA_POINTS) {
@Override
protected Progress interpolate(Progress prev, Progress next, double fraction) {
return prev;
}
};
this.deprecatedCounterUpdates = new HashMap<>();
this.deprecatedDistributionUpdates = new HashMap<>();
this.deprecatedGaugeUpdates = new HashMap<>();
}
private void periodicProgressUpdate() {
updateProgress();
}
@VisibleForTesting
void updateProgress() {
try {
if (bundleProcessOperation.hasFailed()) {
grpcWriteOperation.abortWait();
}
// TODO(BEAM-6189): Replace getProcessBundleProgress with getMonitoringInfos when Metrics
// is deprecated.
ProcessBundleProgressResponse processBundleProgressResponse =
MoreFutures.get(bundleProcessOperation.getProcessBundleProgress());
updateMetrics(processBundleProgressResponse.getMonitoringInfosList());
// Supporting deprecated metrics until all supported runners are migrated to using
// MonitoringInfos
Metrics metrics = processBundleProgressResponse.getMetrics();
updateMetricsDeprecated(metrics);
// todo(migryz): utilize monitoringInfos here.
// Requires Element Count metrics to be implemented.
double elementsConsumed = bundleProcessOperation.getInputElementsConsumed(metrics);
grpcWriteOperationElementsProcessed.accept((int) elementsConsumed);
progressInterpolator.addPoint(
grpcWriteOperation.getElementsSent(), readOperation.getProgress());
latestProgress.set(progressInterpolator.interpolateAndPurge(elementsConsumed));
progressErrors = 0;
} catch (Exception exn) {
if (!isTransientProgressError(exn.getMessage())) {
grpcWriteOperationElementsProcessed.accept(-1); // Not supported.
progressErrors++;
// Only log verbosely every power of two to avoid spamming the logs.
if (Integer.bitCount(progressErrors) == 1) {
LOG.warn(
String.format(
"Progress updating failed %s times. Following exception safely handled.",
progressErrors),
exn);
} else {
LOG.debug(
String.format(
"Progress updating failed %s times. Following exception safely handled.",
progressErrors),
exn);
}
}
try {
latestProgress.set(
progressInterpolator.interpolate(grpcWriteOperation.getElementsSent()));
} catch (IllegalStateException exn2) {
// This can happen if the operation has not yet been started; don't update
// the progress in this case.
} catch (Exception e) {
LOG.error("Exception in catch of updateProgress():", e);
throw e;
}
}
}
/**
* Updates internal metrics state from provided monitoringInfos list.
*
* @param monitoringInfos Usually received from FnApi.
*/
private void updateMetrics(List<MonitoringInfo> monitoringInfos) {
final MonitoringInfoToCounterUpdateTransformer monitoringInfoToCounterUpdateTransformer =
new FnApiMonitoringInfoToCounterUpdateTransformer(
this.bundleProcessOperation.getPtransformIdToUserStepContext(),
this.bundleProcessOperation.getPCollectionIdToNameContext());
counterUpdates =
monitoringInfos.stream()
.map(monitoringInfoToCounterUpdateTransformer::transform)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/**
* Updates internal metrics from provided (deprecated) Metrics object.
*
* @param metrics Metrics object received from FnApi.
*/
@Deprecated
private void updateMetricsDeprecated(Metrics metrics) {
metrics
.getPtransformsMap()
.entrySet()
.forEach(
ptransformEntry -> {
MetricUpdates ptransformMetricUpdates =
MetricsTranslation.metricUpdatesFromProto(
ptransformEntry.getKey(), ptransformEntry.getValue().getUserList());
for (MetricUpdate<Long> update : ptransformMetricUpdates.counterUpdates()) {
deprecatedCounterUpdates.put(update.getKey(), update);
}
for (MetricUpdate<DistributionData> update :
ptransformMetricUpdates.distributionUpdates()) {
deprecatedDistributionUpdates.put(update.getKey(), update);
}
for (MetricUpdate<GaugeData> update : ptransformMetricUpdates.gaugeUpdates()) {
deprecatedGaugeUpdates.put(update.getKey(), update);
}
});
}
@Nullable
@Override
public Progress getWorkerProgress() throws Exception {
return latestProgress.get();
}
@Override
public List<CounterUpdate> extractCounterUpdates() {
return counterUpdates;
}
@Override
public MetricUpdates extractMetricUpdates() {
Map<MetricKey, MetricUpdate<Long>> snapshotCounterUpdates = deprecatedCounterUpdates;
Map<MetricKey, MetricUpdate<DistributionData>> snapshotDistributionUpdates =
deprecatedDistributionUpdates;
Map<MetricKey, MetricUpdate<GaugeData>> snapshotGaugeUpdates = deprecatedGaugeUpdates;
return MetricUpdates.create(
snapshotCounterUpdates.values(),
snapshotDistributionUpdates.values(),
snapshotGaugeUpdates.values());
}
@Nullable
@Override
public DynamicSplitResult requestCheckpoint() throws Exception {
// TODO: Implement checkpointing
return null;
}
@Nullable
@Override
public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest splitRequest)
throws Exception {
return readOperation.requestDynamicSplit(splitRequest);
}
@Override
public void start() {
LOG.info("Starting BeamFnMapTaskExecutor, launching progress thread");
progressErrors = 0;
nextProgressFuture =
scheduler.scheduleAtFixedRate(
this::periodicProgressUpdate,
progressUpdatePeriodMs,
progressUpdatePeriodMs,
TimeUnit.MILLISECONDS);
}
@Override
public void stop() {
LOG.info("Stopping BeamFnMapTaskExecutor, grabbing final metric updates");
nextProgressFuture.cancel(true);
try {
nextProgressFuture.get();
} catch (CancellationException | ExecutionException | InterruptedException exn) {
// expected
}
progressInterpolator.clear();
latestProgress.set(null);
// Set final metrics to precisely the values in this update. This should overwrite, not
// be combined with, all prior updates.
counterUpdates.clear();
deprecatedCounterUpdates.clear();
deprecatedDistributionUpdates.clear();
deprecatedGaugeUpdates.clear();
try {
updateMetrics(MoreFutures.get(bundleProcessOperation.getFinalMonitoringInfos()));
updateMetricsDeprecated(MoreFutures.get(bundleProcessOperation.getFinalMetrics()));
} catch (ExecutionException | InterruptedException exn) {
LOG.info("Failed to get final metrics for bundle", exn);
}
}
}
/**
* Interpolates values between a dynamic set of known data points.
*
* <p>Performs piecewise linear interpolation, extrapolating values beyond known bounds by giving
* the corresponding minimum or maximum known value.
*
* <p>Data must be added in a monotonically increasing manner, and may be purged in a similar
* fashon. This makes this class suitable for performing interpolation over a sliding window.
* Regardless of how many points are added, this class only uses a bounded amount of memory.
*/
@NotThreadSafe
@VisibleForTesting
/*package*/ abstract static class Interpolator<T> {
private double[] xs;
private T[] ys;
private int numPoints;
private double lowerBound = Double.NEGATIVE_INFINITY;
public Interpolator(int maxDataPoints) {
Preconditions.checkArgument(maxDataPoints >= 2);
xs = new double[maxDataPoints];
Arrays.fill(xs, Double.NaN);
ys = (T[]) new Object[maxDataPoints];
numPoints = 0;
}
public void addPoint(double x, T y) {
Preconditions.checkArgument(
numPoints == 0 || xs[numPoints - 1] <= x,
"Data must be added in monotonicaly increasing order.");
if (numPoints >= xs.length) {
purgeInternal();
}
xs[numPoints] = x;
ys[numPoints] = y;
numPoints += 1;
}
public T interpolate(double x) {
if (numPoints == 0) {
return null;
} else if (x <= xs[0]) {
return ys[0];
} else if (xs[numPoints - 1] <= x) {
return ys[numPoints - 1];
} else {
int index = Arrays.binarySearch(xs, 0, numPoints, x);
if (index >= 0) {
return ys[index];
} else {
int after = -index - 1;
int before = after - 1;
return interpolate(ys[before], ys[after], (x - xs[before]) / (xs[after] - xs[before]));
}
}
}
public void purgeUpTo(double x) {
lowerBound = x;
}
public T interpolateAndPurge(double x) {
purgeUpTo(x);
return interpolate(x);
}
public void clear() {
Arrays.fill(xs, Double.NaN); // as a precaution
Arrays.fill(ys, null); // free up any references to positions
numPoints = 0;
}
/** Returns a value fraction of the way from prev to next. */
protected abstract T interpolate(T prev, T next, double fraction);
private void purgeInternal() {
assert numPoints > 2;
if (xs[1] < lowerBound) {
// If there's uneeded elements at the front of our array, shift everything over
// to make more room at the end.
int start = Arrays.binarySearch(xs, 0, numPoints, lowerBound);
// If positive, start is the index of the element that matches the location
// of lowerBound in the array. If negative, it can be transformed into the index
// of where it would be inserted. We recover that value. Note that start
// cannot be 0 since we checked xs[1] against lowerBound.
if (start < 0) {
start = -start - 1;
}
// We want to preserve a single value that is lower than lowerBound for interpolation
// purposes.
start--;
numPoints -= start;
System.arraycopy(xs, start, xs, 0, numPoints);
System.arraycopy(ys, start, ys, 0, numPoints);
} else {
// There's no extra room, we must throw away some data.
// Interpolate a set of data points half as dense but over the same range.
double[] newXs = new double[xs.length];
T[] newYs = (T[]) new Object[ys.length];
newXs[0] = xs[0];
newYs[0] = ys[0];
double dx = (xs[numPoints - 1] - xs[0]) / (numPoints / 2.0);
for (int i = 1; i < numPoints / 2; i++) {
double x = xs[0] + i * dx;
newXs[i] = x;
newYs[i] = interpolate(x);
}
newXs[numPoints / 2] = xs[numPoints - 1];
newYs[numPoints / 2] = ys[numPoints - 1];
xs = newXs;
ys = newYs;
numPoints = numPoints / 2 + 1;
}
Arrays.fill(xs, numPoints, ys.length, Double.NaN); // as a precaution
Arrays.fill(ys, numPoints, ys.length, null); // free up any references to positions
}
}
private ProgressTracker createProgressTracker() {
ReadOperation readOperation;
RemoteGrpcPortWriteOperation grpcWriteOperation;
RegisterAndProcessBundleOperation bundleProcessOperation;
try {
readOperation = getReadOperation();
} catch (Exception exn) {
readOperation = null;
LOG.info("Unable to get read operation.", exn);
return new NullProgressTracker();
}
// If there is a exactly one of each of RemoteGrpcPortWriteOperation and
// RegisterAndProcessBundleOperation we know they have the right topology.
try {
grpcWriteOperation =
Iterables.getOnlyElement(
Iterables.filter(operations, RemoteGrpcPortWriteOperation.class));
bundleProcessOperation =
Iterables.getOnlyElement(
Iterables.filter(operations, RegisterAndProcessBundleOperation.class));
} catch (IllegalArgumentException | NoSuchElementException exn) {
// TODO: Handle more than one sdk worker processing a single bundle.
grpcWriteOperation = null;
bundleProcessOperation = null;
LOG.debug("Does not have exactly one grpcWRite and bundleProcess operation.", exn);
}
if (grpcWriteOperation != null && bundleProcessOperation != null) {
return new SingularProcessBundleProgressTracker(
readOperation, grpcWriteOperation, bundleProcessOperation);
} else {
return new ReadOperationProgressTracker(readOperation);
}
}
/** Whether the given error is likely to go away (e.g. the bundle has not started). */
private static boolean isTransientProgressError(String msg) {
return msg != null
&& (msg.contains("Process bundle request not yet scheduled")
|| msg.contains("Unknown process bundle instruction")
|| msg.contains("unstarted operation"));
}
}