blob: 20bdee47f330525b5fcc1e11ecd16f301fd8aab5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.MetricStructuredName;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.api.services.dataflow.model.SourceOperationResponse;
import com.google.api.services.dataflow.model.Status;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wrapper around {@link WorkUnitClient} with methods for creating and sending work item status
* updates.
*/
// Very likely real potential for bugs - https://issues.apache.org/jira/browse/BEAM-6565
@SuppressFBWarnings("IS2_INCONSISTENT_SYNC")
public class WorkItemStatusClient {
private static final Logger LOG = LoggerFactory.getLogger(WorkItemStatusClient.class);
private final WorkItem workItem;
private final WorkUnitClient workUnitClient;
private @Nullable DataflowWorkExecutor worker;
private Long nextReportIndex;
private transient String uniqueWorkId = null;
private boolean finalStateSent = false;
@Nullable private BatchModeExecutionContext executionContext;
/**
* Construct a partly-initialized {@link WorkItemStatusClient}. Once the {@link
* DataflowWorkExecutor worker} has been instantiated initialization should be completed by
* calling {@link #setWorker}.
*
* <p>Initialization is split into two steps because we may need to report failures during the
* construction of the worker.
*/
public WorkItemStatusClient(WorkUnitClient workUnitClient, WorkItem workItem) {
this.workUnitClient = workUnitClient;
this.workItem = workItem;
this.nextReportIndex =
checkNotNull(workItem.getInitialReportIndex(), "WorkItem missing initial report index");
}
public String uniqueWorkId() {
if (uniqueWorkId == null) {
uniqueWorkId =
String.format("%s;%s;%s", workItem.getProjectId(), workItem.getJobId(), workItem.getId());
}
return uniqueWorkId;
}
/**
* Finish initialization of the {@link WorkItemStatusClient} by assigning the {@link
* DataflowWorkExecutor} and {@link BatchModeExecutionContext}.
*/
public synchronized void setWorker(
DataflowWorkExecutor worker, BatchModeExecutionContext executionContext) {
checkArgument(worker != null, "worker must be non-null");
checkState(this.worker == null, "Can only call setWorker once");
this.worker = worker;
this.executionContext = executionContext;
}
/** Return the {@link WorkItemServiceState} resulting from sending an error completion status. */
public synchronized WorkItemServiceState reportError(Throwable e) throws IOException {
checkState(!finalStateSent, "cannot reportUpdates after sending a final state");
WorkItemStatus status = createStatusUpdate(true);
// TODO: Provide more structure representation of error, e.g., the serialized exception object.
// TODO: Look into moving the stack trace thinning into the client.
Throwable t = e instanceof UserCodeException ? e.getCause() : e;
Status error = new Status();
error.setCode(2); // Code.UNKNOWN. TODO: Replace with a generated definition.
// TODO: Attach the stack trace as exception details, not to the message.
String logPrefix = String.format("Failure processing work item %s", uniqueWorkId());
if (isOutOfMemoryError(t)) {
String message =
"An OutOfMemoryException occurred. Consider specifying higher memory "
+ "instances in PipelineOptions.\n";
LOG.error("{}: {}", logPrefix, message);
error.setMessage(message + DataflowWorkerLoggingHandler.formatException(t));
} else {
LOG.error(
"{}: Uncaught exception occurred during work unit execution. This will be retried.",
logPrefix,
t);
error.setMessage(DataflowWorkerLoggingHandler.formatException(t));
}
status.setErrors(ImmutableList.of(error));
return execute(status);
}
/** Return the {@link WorkItemServiceState} resulting from sending a success completion status. */
public synchronized WorkItemServiceState reportSuccess() throws IOException {
checkState(!finalStateSent, "cannot reportSuccess after sending a final state");
checkState(worker != null, "setWorker should be called before reportSuccess");
WorkItemStatus status = createStatusUpdate(true);
if (worker instanceof SourceOperationExecutor) {
// TODO: Find out a generic way for the DataflowWorkExecutor to report work-specific results
// into the work update.
SourceOperationResponse response = ((SourceOperationExecutor) worker).getResponse();
if (response != null) {
status.setSourceOperationResponse(response);
}
}
LOG.info("Success processing work item {}", uniqueWorkId());
return execute(status);
}
/** Return the {@link WorkItemServiceState} resulting from sending a progress update. */
public synchronized WorkItemServiceState reportUpdate(
@Nullable DynamicSplitResult dynamicSplitResult, Duration requestedLeaseDuration)
throws Exception {
checkState(worker != null, "setWorker should be called before reportUpdate");
checkState(!finalStateSent, "cannot reportUpdates after sending a final state");
checkArgument(requestedLeaseDuration != null, "requestLeaseDuration must be non-null");
WorkItemStatus status = createStatusUpdate(false);
status.setRequestedLeaseDuration(TimeUtil.toCloudDuration(requestedLeaseDuration));
populateProgress(status);
populateSplitResult(status, dynamicSplitResult);
return execute(status);
}
private static boolean isOutOfMemoryError(Throwable t) {
while (t != null) {
if (t instanceof OutOfMemoryError) {
return true;
}
t = t.getCause();
}
return false;
}
@Nullable
private synchronized WorkItemServiceState execute(WorkItemStatus status) throws IOException {
WorkItemServiceState result = workUnitClient.reportWorkItemStatus(status);
if (result != null) {
nextReportIndex = result.getNextReportIndex();
commitMetrics();
}
if (status.getCompleted()) {
checkState(!finalStateSent, "cannot reportUpdates after sending a final state");
finalStateSent = true;
}
return result;
}
@VisibleForTesting
synchronized void populateSplitResult(
WorkItemStatus status, DynamicSplitResult dynamicSplitResult) {
if (dynamicSplitResult instanceof NativeReader.DynamicSplitResultWithPosition) {
NativeReader.DynamicSplitResultWithPosition asPosition =
(NativeReader.DynamicSplitResultWithPosition) dynamicSplitResult;
status.setStopPosition(
SourceTranslationUtils.toCloudPosition(asPosition.getAcceptedPosition()));
} else if (dynamicSplitResult instanceof WorkerCustomSources.BoundedSourceSplit) {
status.setDynamicSourceSplit(
WorkerCustomSources.toSourceSplit(
(WorkerCustomSources.BoundedSourceSplit<?>) dynamicSplitResult));
} else if (dynamicSplitResult != null) {
throw new IllegalArgumentException(
"Unexpected type of dynamic split result: " + dynamicSplitResult);
}
}
@VisibleForTesting
synchronized void populateProgress(WorkItemStatus status) throws Exception {
Progress progress = worker.getWorkerProgress();
if (progress != null) {
status.setReportedProgress(SourceTranslationUtils.readerProgressToCloudProgress(progress));
}
}
@VisibleForTesting
synchronized void populateMetricUpdates(WorkItemStatus status) {
List<MetricUpdate> updates = new ArrayList<>();
if (executionContext != null && executionContext.getExecutionStateTracker() != null) {
ExecutionStateTracker tracker = executionContext.getExecutionStateTracker();
MetricUpdate update = new MetricUpdate();
update.setKind("internal");
MetricStructuredName name = new MetricStructuredName();
name.setName("state-sampler");
update.setName(name);
Map<String, Object> metric = new HashMap<>();
ExecutionState state = tracker.getCurrentState();
if (state != null) {
metric.put("last-state-name", state.getDescription());
}
metric.put("num-transitions", tracker.getNumTransitions());
metric.put("last-state-duration-ms", tracker.getMillisSinceLastTransition());
update.setInternal(metric);
updates.add(update);
}
status.setMetricUpdates(updates);
}
private synchronized WorkItemStatus createStatusUpdate(boolean isFinal) {
WorkItemStatus status = new WorkItemStatus();
status.setWorkItemId(Long.toString(workItem.getId()));
status.setCompleted(isFinal);
status.setReportIndex(
checkNotNull(nextReportIndex, "nextReportIndex should be non-null when sending an update"));
if (worker != null) {
populateMetricUpdates(status);
populateCounterUpdates(status);
}
double throttleTime = extractThrottleTime();
status.setTotalThrottlerWaitTimeSeconds(throttleTime);
return status;
}
@VisibleForTesting
synchronized void populateCounterUpdates(WorkItemStatus status) {
if (worker == null) {
return;
}
// Checking against boolean, because getCompleted can return null
boolean isFinalUpdate = Boolean.TRUE.equals(status.getCompleted());
Map<Object, CounterUpdate> counterUpdatesMap = new HashMap<>();
final Consumer<CounterUpdate> appendCounterUpdate =
x ->
counterUpdatesMap.put(
x.getStructuredNameAndMetadata() == null
? x.getNameAndKind()
: x.getStructuredNameAndMetadata(),
x);
// Output counters
extractCounters(worker.getOutputCounters()).forEach(appendCounterUpdate);
// User metrics reported in Worker
extractMetrics(isFinalUpdate).forEach(appendCounterUpdate);
// MSec counters reported in worker
extractMsecCounters(isFinalUpdate).forEach(appendCounterUpdate);
// Metrics reported in SDK runner.
// This includes all different kinds of metrics coming from SDK.
// Keep in mind that these metrics might contain different types of counter names:
// i.e. structuredNameAndMetadata and nameAndKind
worker.extractMetricUpdates().forEach(appendCounterUpdate);
status.setCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values()));
}
private synchronized Iterable<CounterUpdate> extractCounters(@Nullable CounterSet counters) {
if (counters == null) {
return Collections.emptyList();
}
// Currently we lack a reliable exactly-once delivery mechanism for
// work updates, i.e. they can be retried or reordered, so sending
// delta updates could lead to double-counted or missed contributions.
// However, delta updates may be beneficial for performance.
// TODO: Implement exactly-once delivery and use deltas,
// if it ever becomes clear that deltas are necessary for performance.
boolean delta = false;
return counters.extractUpdates(delta, DataflowCounterUpdateExtractor.INSTANCE);
}
/**
* This and {@link #commitMetrics} need to be synchronized since we should not call {@link
* MetricsContainerImpl#getUpdates} on any object within an operation while also calling {@link
* MetricsContainerImpl#commitUpdates}.
*/
private synchronized Iterable<CounterUpdate> extractMetrics(boolean isFinalUpdate) {
return executionContext == null
? Collections.emptyList()
: executionContext.extractMetricUpdates(isFinalUpdate);
}
public Iterable<CounterUpdate> extractMsecCounters(boolean isFinalUpdate) {
return executionContext == null
? Collections.emptyList()
: executionContext.extractMsecCounters(isFinalUpdate);
}
public long extractThrottleTime() {
return executionContext == null ? 0L : executionContext.extractThrottleTime();
}
/**
* This and {@link #extractMetrics} need to be synchronized since we should not call {@link
* MetricsContainerImpl#getUpdates} on any object within an operation while also calling {@link
* MetricsContainerImpl#commitUpdates}.
*/
@VisibleForTesting
synchronized void commitMetrics() {
if (executionContext == null) {
return;
}
executionContext.commitMetricUpdates();
}
}