blob: 5cc0db71214e017b348e033a1dd60719795181f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import com.google.api.services.dataflow.model.MapTask;
import com.google.api.services.dataflow.model.WorkItem;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.SdkHarnessRegistry.SdkWorkerHarness;
import org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction;
import org.apache.beam.runners.dataflow.worker.graph.CreateExecutableStageNodeFunction;
import org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction;
import org.apache.beam.runners.dataflow.worker.graph.DeduceFlattenLocationsFunction;
import org.apache.beam.runners.dataflow.worker.graph.DeduceNodeLocationsFunction;
import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
import org.apache.beam.runners.dataflow.worker.graph.LengthPrefixUnknownCoders;
import org.apache.beam.runners.dataflow.worker.graph.MapTaskToNetworkFunction;
import org.apache.beam.runners.dataflow.worker.graph.Networks;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.RemoteGrpcPortNode;
import org.apache.beam.runners.dataflow.worker.graph.RegisterNodeFunction;
import org.apache.beam.runners.dataflow.worker.graph.ReplacePgbkWithPrecombineFunction;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.sdk.util.WeightedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.graph.MutableNetwork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a semi-abstract harness for executing WorkItem tasks in Java workers. Concrete
* implementations need to implement a WorkUnitClient.
*
* <p>DataflowWorker presents one public interface, getAndPerformWork(), which uses the
* WorkUnitClient to get work, execute it, and update the work.
*/
public class BatchDataflowWorker implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BatchDataflowWorker.class);
/** A client to get and update work items. */
private final WorkUnitClient workUnitClient;
/**
* Pipeline options, initially provided via the constructor and partially provided via each work
* work unit.
*/
private final DataflowWorkerHarnessOptions options;
/** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
/** The idGenerator to generate unique id globally. */
private static final IdGenerator idGenerator = IdGenerators.decrementingLongs();
/**
* Function which converts map tasks to their network representation for execution.
*
* <p>It will:
*
* <ul>
* <li>Fix multi output infos to exist on all ParDo instructions.
* <li>Translate the map task to a network representation.
* </ul>
*/
private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToBaseNetwork =
new FixMultiOutputInfosOnParDoInstructions(idGenerator)
.andThen(new MapTaskToNetworkFunction(idGenerator));
/** Registry of known {@link ReaderFactory ReaderFactories}. */
private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry();
/** Registry of known {@link SinkFactory SinkFactories}. */
private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry();
/** A side input cache shared between all execution contexts. */
private final Cache<?, WeightedValue<?>> sideInputDataCache;
/**
* A side input cache shared between all execution contexts. This cache is meant to store values
* as weak references. This allows for insertion of logical keys with zero weight since they will
* only be scoped to the lifetime of the value being cached.
*/
private final Cache<?, ?> sideInputWeakReferenceCache;
private static final int DEFAULT_STATUS_PORT = 8081;
/** Status pages returning health of worker. */
private WorkerStatusPages statusPages;
/** Periodic sender of debug information to the debug capture service. */
private DebugCapture.Manager debugCaptureManager = null;
/**
* A weight in "bytes" for the overhead of a {@link Weighted} wrapper in the cache. It is just an
* approximation so it is OK for it to be fairly arbitrary as long as it is nonzero.
*/
private static final int OVERHEAD_WEIGHT = 8;
private static final long MEGABYTES = 1024 * 1024;
/**
* Limit the number of logical references. Weak references may never be cleared if the object is
* long lived irrespective if the user actually is interested in the key lookup anymore.
*/
private static final int MAX_LOGICAL_REFERENCES = 1_000_000;
/** How many concurrent write operations to a cache should we allow. */
private static final int CACHE_CONCURRENCY_LEVEL = 4 * Runtime.getRuntime().availableProcessors();
private final SdkHarnessRegistry sdkHarnessRegistry;
private final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork;
private final MemoryMonitor memoryMonitor;
private final Thread memoryMonitorThread;
/**
* Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java
* execution.
*
* <p>This is also known as the "legacy" or "pre-portability" approach. It is not yet deprecated
* as there is not a compatible path forward for users.
*/
static BatchDataflowWorker forBatchIntrinsicWorkerHarness(
WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) {
return new BatchDataflowWorker(
null,
SdkHarnessRegistries.emptySdkHarnessRegistry(),
workUnitClient,
IntrinsicMapTaskExecutorFactory.defaultFactory(),
options);
}
/**
* Returns a {@link BatchDataflowWorker} configured to execute user functions via the Beam "Fn
* API".
*
* <p>This is also known as the "portable" or "Beam model" approach.
*/
static BatchDataflowWorker forBatchFnWorkerHarness(
@Nullable RunnerApi.Pipeline pipeline,
SdkHarnessRegistry sdkHarnessRegistry,
WorkUnitClient workUnitClient,
DataflowWorkerHarnessOptions options) {
return new BatchDataflowWorker(
pipeline,
sdkHarnessRegistry,
workUnitClient,
BeamFnMapTaskExecutorFactory.defaultFactory(),
options);
}
protected BatchDataflowWorker(
@Nullable RunnerApi.Pipeline pipeline,
SdkHarnessRegistry sdkHarnessRegistry,
WorkUnitClient workUnitClient,
DataflowMapTaskExecutorFactory mapTaskExecutorFactory,
DataflowWorkerHarnessOptions options) {
this.mapTaskExecutorFactory = mapTaskExecutorFactory;
this.sdkHarnessRegistry = sdkHarnessRegistry;
this.workUnitClient = workUnitClient;
this.options = options;
this.sideInputDataCache =
CacheBuilder.newBuilder()
.maximumWeight(options.getWorkerCacheMb() * MEGABYTES) // weights are in bytes
.weigher(Weighers.fixedWeightKeys(OVERHEAD_WEIGHT))
.softValues()
.concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
.build();
this.sideInputWeakReferenceCache =
CacheBuilder.newBuilder()
.maximumSize(MAX_LOGICAL_REFERENCES)
.weakValues()
.concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
.build();
this.memoryMonitor = MemoryMonitor.fromOptions(options);
this.statusPages =
WorkerStatusPages.create(
DEFAULT_STATUS_PORT, this.memoryMonitor, sdkHarnessRegistry::sdkHarnessesAreHealthy);
if (!DataflowRunner.hasExperiment(options, "disable_debug_capture")) {
this.debugCaptureManager =
initializeAndStartDebugCaptureManager(options, statusPages.getDebugCapturePages());
}
// TODO: this conditional -> two implementations of common interface, or
// param/injection
if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> transformToRunnerNetwork;
Function<MutableNetwork<Node, Edge>, Node> sdkFusedStage;
Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>> lengthPrefixUnknownCoders =
LengthPrefixUnknownCoders::forSdkNetwork;
if (DataflowRunner.hasExperiment(options, "use_executable_stage_bundle_execution")) {
sdkFusedStage = new CreateExecutableStageNodeFunction(pipeline, idGenerator);
transformToRunnerNetwork =
new CreateRegisterFnOperationFunction(
idGenerator,
this::createPortNode,
lengthPrefixUnknownCoders.andThen(sdkFusedStage),
true);
} else {
sdkFusedStage =
pipeline == null
? RegisterNodeFunction.withoutPipeline(
idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
: RegisterNodeFunction.forPipeline(
pipeline, idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
transformToRunnerNetwork =
new CreateRegisterFnOperationFunction(
idGenerator,
this::createPortNode,
lengthPrefixUnknownCoders.andThen(sdkFusedStage),
false);
}
mapTaskToNetwork =
mapTaskToBaseNetwork
.andThen(new ReplacePgbkWithPrecombineFunction())
.andThen(new DeduceNodeLocationsFunction())
.andThen(new DeduceFlattenLocationsFunction())
.andThen(new CloneAmbiguousFlattensFunction())
.andThen(transformToRunnerNetwork)
.andThen(LengthPrefixUnknownCoders::andReplaceForRunnerNetwork);
} else {
mapTaskToNetwork = mapTaskToBaseNetwork;
}
this.memoryMonitorThread = startMemoryMonitorThread(memoryMonitor);
ExecutionStateSampler.instance().start();
}
private static DebugCapture.Manager initializeAndStartDebugCaptureManager(
DataflowWorkerHarnessOptions options, Collection<Capturable> debugCapturePages) {
DebugCapture.Manager result = new DebugCapture.Manager(options, debugCapturePages);
result.start();
return result;
}
private static Thread startMemoryMonitorThread(MemoryMonitor memoryMonitor) {
Thread result = new Thread(memoryMonitor);
result.setDaemon(true);
result.setPriority(Thread.MIN_PRIORITY);
result.setName("MemoryMonitor");
result.start();
return result;
}
private Node createPortNode() {
return RemoteGrpcPortNode.create(
RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
.build(),
idGenerator.getId());
}
/**
* Gets WorkItem and performs it; returns true if work succeeded otherwise returns false. This
* method will continuously attempt to get work until at least one work item is returned or the
* service returns an error. We purposely do this because the service intelligently uses hanging
* gets to be able to assign work as quickly as possible without the need for an exponential
* backoff strategy for the work request calls.
*
* <p>getAndPerformWork may throw if there is a failure of the WorkUnitClient.
*/
public boolean getAndPerformWork() throws IOException {
while (true) {
Optional<WorkItem> work = workUnitClient.getWorkItem();
if (work.isPresent()) {
WorkItemStatusClient statusProvider = new WorkItemStatusClient(workUnitClient, work.get());
return doWork(work.get(), statusProvider);
}
}
}
/**
* Performs the given work; returns true if successful.
*
* @throws IOException Only if the WorkUnitClient fails.
*/
@VisibleForTesting
boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) throws IOException {
LOG.debug("Executing: {}", workItem);
DataflowWorkExecutor worker = null;
SdkWorkerHarness sdkWorkerHarness = sdkHarnessRegistry.getAvailableWorkerAndAssignWork();
try {
// Populate PipelineOptions with data from work unit.
options.setProject(workItem.getProjectId());
final String stageName;
if (workItem.getMapTask() != null) {
stageName = workItem.getMapTask().getStageName();
} else if (workItem.getSourceOperationTask() != null) {
stageName = workItem.getSourceOperationTask().getStageName();
} else {
throw new RuntimeException("Unknown kind of work item: " + workItem.toString());
}
CounterSet counterSet = new CounterSet();
BatchModeExecutionContext executionContext =
BatchModeExecutionContext.create(
counterSet,
sideInputDataCache,
sideInputWeakReferenceCache,
readerRegistry,
options,
stageName,
String.valueOf(workItem.getId()));
if (workItem.getMapTask() != null) {
MutableNetwork<Node, Edge> network = mapTaskToNetwork.apply(workItem.getMapTask());
if (LOG.isDebugEnabled()) {
LOG.debug("Network as Graphviz .dot: {}", Networks.toDot(network));
}
worker =
mapTaskExecutorFactory.create(
sdkWorkerHarness.getControlClientHandler(),
sdkWorkerHarness.getGrpcDataFnServer(),
sdkHarnessRegistry.beamFnDataApiServiceDescriptor(),
sdkWorkerHarness.getGrpcStateFnServer(),
network,
options,
stageName,
readerRegistry,
sinkRegistry,
executionContext,
counterSet,
idGenerator);
} else if (workItem.getSourceOperationTask() != null) {
worker =
SourceOperationExecutorFactory.create(
options,
workItem.getSourceOperationTask(),
counterSet,
executionContext,
stageName);
} else {
throw new IllegalStateException("Work Item was neither a MapTask nor a SourceOperation");
}
workItemStatusClient.setWorker(worker, executionContext);
DataflowWorkProgressUpdater progressUpdater =
new DataflowWorkProgressUpdater(workItemStatusClient, workItem, worker);
executeWork(worker, progressUpdater);
workItemStatusClient.reportSuccess();
return true;
} catch (Throwable e) {
workItemStatusClient.reportError(e);
return false;
} finally {
if (worker != null) {
try {
worker.close();
} catch (Exception exn) {
LOG.warn(
"Uncaught exception while closing worker. All work has already committed or "
+ "been marked for retry.",
exn);
}
}
if (sdkWorkerHarness != null) {
sdkHarnessRegistry.completeWork(sdkWorkerHarness);
}
}
}
/** Executes the work and report progress. For testing only. */
void executeWork(DataflowWorkExecutor worker, DataflowWorkProgressUpdater progressUpdater)
throws Exception {
progressUpdater.startReportingProgress();
// Blocks while executing the work.
try {
worker.execute();
} finally {
// stopReportingProgress can throw an exception if the final progress
// update fails. For correctness, the task must then be marked as failed.
progressUpdater.stopReportingProgress();
}
}
/** Runs the status server to report worker health on the specified port. */
public void startStatusServer() {
statusPages.start();
}
/** Cleanup allocated resources. */
@Override
public void close() {
// TODO: Implement proper cleanup logic.
if (this.debugCaptureManager != null) {
this.debugCaptureManager.stop();
}
this.memoryMonitor.stop();
this.statusPages.stop();
ExecutionStateSampler.instance().stop();
long timeoutMilliSec = 5 * 1000;
try {
this.memoryMonitorThread.join(timeoutMilliSec);
} catch (InterruptedException ex) {
LOG.warn("Failed to wait for monitor thread to exit. Ex: {}", ex);
}
if (this.memoryMonitorThread.isAlive()) {
LOG.warn("memoryMonitorThread didn't exit. Please, check for potential memory leaks.");
}
}
}