blob: 038c45b3545c343899168cb26ff3f3d990c66dc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.fn.harness.control;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.data.QueueingBeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.Builder;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.SetMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Processes {@link BeamFnApi.ProcessBundleRequest}s by materializing the set of required runners
* for each {@link RunnerApi.FunctionSpec}, wiring them together based upon the {@code input} and
* {@code output} map definitions.
*
* <p>Finally executes the DAG based graph by starting all runners in reverse topological order, and
* finishing all runners in forward topological order.
*/
public class ProcessBundleHandler {
// TODO: What should the initial set of URNs be?
private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
public static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class);
private static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES;
static {
Set<Registrar> pipelineRunnerRegistrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
pipelineRunnerRegistrars.addAll(
Lists.newArrayList(ServiceLoader.load(Registrar.class, ReflectHelpers.findClassLoader())));
// Load all registered PTransform runner factories.
ImmutableMap.Builder<String, PTransformRunnerFactory> builder = ImmutableMap.builder();
for (Registrar registrar : pipelineRunnerRegistrars) {
builder.putAll(registrar.getPTransformRunnerFactories());
}
REGISTERED_RUNNER_FACTORIES = builder.build();
}
private final PipelineOptions options;
private final Function<String, Message> fnApiRegistry;
private final BeamFnDataClient beamFnDataClient;
private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
private final PTransformRunnerFactory defaultPTransformRunnerFactory;
public ProcessBundleHandler(
PipelineOptions options,
Function<String, Message> fnApiRegistry,
BeamFnDataClient beamFnDataClient,
BeamFnStateGrpcClientCache beamFnStateGrpcClientCache) {
this(
options,
fnApiRegistry,
beamFnDataClient,
beamFnStateGrpcClientCache,
REGISTERED_RUNNER_FACTORIES);
}
@VisibleForTesting
ProcessBundleHandler(
PipelineOptions options,
Function<String, Message> fnApiRegistry,
BeamFnDataClient beamFnDataClient,
BeamFnStateGrpcClientCache beamFnStateGrpcClientCache,
Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) {
this.options = options;
this.fnApiRegistry = fnApiRegistry;
this.beamFnDataClient = beamFnDataClient;
this.beamFnStateGrpcClientCache = beamFnStateGrpcClientCache;
this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap;
this.defaultPTransformRunnerFactory =
new UnknownPTransformRunnerFactory(urnToPTransformRunnerFactoryMap.keySet());
}
private void createRunnerAndConsumersForPTransformRecursively(
BeamFnStateClient beamFnStateClient,
BeamFnDataClient queueingClient,
String pTransformId,
PTransform pTransform,
Supplier<String> processBundleInstructionId,
ProcessBundleDescriptor processBundleDescriptor,
SetMultimap<String, String> pCollectionIdsToConsumingPTransforms,
PCollectionConsumerRegistry pCollectionConsumerRegistry,
Set<String> processedPTransformIds,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
BundleSplitListener splitListener)
throws IOException {
// Recursively ensure that all consumers of the output PCollection have been created.
// Since we are creating the consumers first, we know that the we are building the DAG
// in reverse topological order.
for (String pCollectionId : pTransform.getOutputsMap().values()) {
for (String consumingPTransformId : pCollectionIdsToConsumingPTransforms.get(pCollectionId)) {
createRunnerAndConsumersForPTransformRecursively(
beamFnStateClient,
queueingClient,
consumingPTransformId,
processBundleDescriptor.getTransformsMap().get(consumingPTransformId),
processBundleInstructionId,
processBundleDescriptor,
pCollectionIdsToConsumingPTransforms,
pCollectionConsumerRegistry,
processedPTransformIds,
startFunctionRegistry,
finishFunctionRegistry,
splitListener);
}
}
if (!pTransform.hasSpec()) {
throw new IllegalArgumentException(
String.format(
"Cannot process transform with no spec: %s", TextFormat.printToString(pTransform)));
}
if (pTransform.getSubtransformsCount() > 0) {
throw new IllegalArgumentException(
String.format(
"Cannot process composite transform: %s", TextFormat.printToString(pTransform)));
}
// Skip reprocessing processed pTransforms.
if (!processedPTransformIds.contains(pTransformId)) {
urnToPTransformRunnerFactoryMap
.getOrDefault(pTransform.getSpec().getUrn(), defaultPTransformRunnerFactory)
.createRunnerForPTransform(
options,
queueingClient,
beamFnStateClient,
pTransformId,
pTransform,
processBundleInstructionId,
processBundleDescriptor.getPcollectionsMap(),
processBundleDescriptor.getCodersMap(),
processBundleDescriptor.getWindowingStrategiesMap(),
pCollectionConsumerRegistry,
startFunctionRegistry,
finishFunctionRegistry,
splitListener);
processedPTransformIds.add(pTransformId);
}
}
/**
* Processes a bundle, running the start(), process(), and finish() functions. This function is
* required to be reentrant.
*/
public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request)
throws Exception {
// Note: We must create one instance of the QueueingBeamFnDataClient as it is designed to
// handle the life of a bundle. It will insert elements onto a queue and drain them off so all
// process() calls will execute on this thread when queueingClient.drainAndBlock() is called.
QueueingBeamFnDataClient queueingClient = new QueueingBeamFnDataClient(this.beamFnDataClient);
String bundleId = request.getProcessBundle().getProcessBundleDescriptorReference();
BeamFnApi.ProcessBundleDescriptor bundleDescriptor =
(BeamFnApi.ProcessBundleDescriptor) fnApiRegistry.apply(bundleId);
SetMultimap<String, String> pCollectionIdsToConsumingPTransforms = HashMultimap.create();
MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
ExecutionStateTracker stateTracker =
new ExecutionStateTracker(ExecutionStateSampler.instance());
PCollectionConsumerRegistry pCollectionConsumerRegistry =
new PCollectionConsumerRegistry(metricsContainerRegistry, stateTracker);
HashSet<String> processedPTransformIds = new HashSet<>();
PTransformFunctionRegistry startFunctionRegistry =
new PTransformFunctionRegistry(
metricsContainerRegistry, stateTracker, ExecutionStateTracker.START_STATE_NAME);
PTransformFunctionRegistry finishFunctionRegistry =
new PTransformFunctionRegistry(
metricsContainerRegistry, stateTracker, ExecutionStateTracker.FINISH_STATE_NAME);
// Build a multimap of PCollection ids to PTransform ids which consume said PCollections
for (Map.Entry<String, RunnerApi.PTransform> entry :
bundleDescriptor.getTransformsMap().entrySet()) {
for (String pCollectionId : entry.getValue().getInputsMap().values()) {
pCollectionIdsToConsumingPTransforms.put(pCollectionId, entry.getKey());
}
}
ProcessBundleResponse.Builder response = ProcessBundleResponse.newBuilder();
// Instantiate a State API call handler depending on whether a State Api service descriptor
// was specified.
try (HandleStateCallsForBundle beamFnStateClient =
bundleDescriptor.hasStateApiServiceDescriptor()
? new BlockTillStateCallsFinish(
beamFnStateGrpcClientCache.forApiServiceDescriptor(
bundleDescriptor.getStateApiServiceDescriptor()))
: new FailAllStateCallsForBundle(request.getProcessBundle())) {
Multimap<String, BundleApplication> allPrimaries = ArrayListMultimap.create();
Multimap<String, DelayedBundleApplication> allResiduals = ArrayListMultimap.create();
BundleSplitListener splitListener =
(List<BundleApplication> primaries, List<DelayedBundleApplication> residuals) -> {
// Reset primaries and accumulate residuals.
Multimap<String, BundleApplication> newPrimaries = ArrayListMultimap.create();
for (BundleApplication primary : primaries) {
newPrimaries.put(primary.getPtransformId(), primary);
}
allPrimaries.clear();
allPrimaries.putAll(newPrimaries);
for (DelayedBundleApplication residual : residuals) {
allResiduals.put(residual.getApplication().getPtransformId(), residual);
}
};
// Create a BeamFnStateClient
for (Map.Entry<String, RunnerApi.PTransform> entry :
bundleDescriptor.getTransformsMap().entrySet()) {
// Skip anything which isn't a root
// TODO: Remove source as a root and have it be triggered by the Runner.
if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn())
&& !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn())
&& !PTransformTranslation.READ_TRANSFORM_URN.equals(
entry.getValue().getSpec().getUrn())) {
continue;
}
createRunnerAndConsumersForPTransformRecursively(
beamFnStateClient,
queueingClient,
entry.getKey(),
entry.getValue(),
request::getInstructionId,
bundleDescriptor,
pCollectionIdsToConsumingPTransforms,
pCollectionConsumerRegistry,
processedPTransformIds,
startFunctionRegistry,
finishFunctionRegistry,
splitListener);
}
try (Closeable closeTracker = stateTracker.activate()) {
// Already in reverse topological order so we don't need to do anything.
for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) {
LOG.debug("Starting function {}", startFunction);
startFunction.run();
}
queueingClient.drainAndBlock();
// Need to reverse this since we want to call finish in topological order.
for (ThrowingRunnable finishFunction :
Lists.reverse(finishFunctionRegistry.getFunctions())) {
LOG.debug("Finishing function {}", finishFunction);
finishFunction.run();
}
if (!allResiduals.isEmpty()) {
response.addAllResidualRoots(allResiduals.values());
}
}
// Get start bundle Execution Time Metrics.
for (MonitoringInfo mi : startFunctionRegistry.getExecutionTimeMonitoringInfos()) {
response.addMonitoringInfos(mi);
}
// Get process bundle Execution Time Metrics.
for (MonitoringInfo mi : pCollectionConsumerRegistry.getExecutionTimeMonitoringInfos()) {
response.addMonitoringInfos(mi);
}
// Get finish bundle Execution Time Metrics.
for (MonitoringInfo mi : finishFunctionRegistry.getExecutionTimeMonitoringInfos()) {
response.addMonitoringInfos(mi);
}
// Extract all other MonitoringInfos other than the execution time monitoring infos.
for (MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
response.addMonitoringInfos(mi);
}
}
return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
}
/**
* A {@link BeamFnStateClient} which counts the number of outstanding {@link StateRequest}s and
* blocks till they are all finished.
*/
private static class BlockTillStateCallsFinish extends HandleStateCallsForBundle {
private final BeamFnStateClient beamFnStateClient;
private final Phaser phaser;
private int currentPhase;
private BlockTillStateCallsFinish(BeamFnStateClient beamFnStateClient) {
this.beamFnStateClient = beamFnStateClient;
this.phaser = new Phaser(1 /* initial party is the process bundle handler */);
this.currentPhase = phaser.getPhase();
}
@Override
public void close() throws Exception {
int unarrivedParties = phaser.getUnarrivedParties();
if (unarrivedParties > 0) {
LOG.debug(
"Waiting for {} parties to arrive before closing, current phase {}.",
unarrivedParties,
currentPhase);
}
currentPhase = phaser.arriveAndAwaitAdvance();
}
@Override
@SuppressWarnings("FutureReturnValueIgnored") // async arriveAndDeregister task doesn't need
// monitoring.
public void handle(
StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
// Register each request with the phaser and arrive and deregister each time a request
// completes.
phaser.register();
response.whenComplete((stateResponse, throwable) -> phaser.arriveAndDeregister());
beamFnStateClient.handle(requestBuilder, response);
}
}
/**
* A {@link BeamFnStateClient} which fails all requests because the {@link ProcessBundleRequest}
* does not contain a State API {@link ApiServiceDescriptor}.
*/
private static class FailAllStateCallsForBundle extends HandleStateCallsForBundle {
private final ProcessBundleRequest request;
private FailAllStateCallsForBundle(ProcessBundleRequest request) {
this.request = request;
}
@Override
public void close() throws Exception {
// no-op
}
@Override
public void handle(Builder requestBuilder, CompletableFuture<StateResponse> response) {
throw new IllegalStateException(
String.format(
"State API calls are unsupported because the "
+ "ProcessBundleRequest %s does not support state.",
request));
}
}
private abstract static class HandleStateCallsForBundle
implements AutoCloseable, BeamFnStateClient {}
private static class UnknownPTransformRunnerFactory implements PTransformRunnerFactory<Object> {
private final Set<String> knownUrns;
private UnknownPTransformRunnerFactory(Set<String> knownUrns) {
this.knownUrns = knownUrns;
}
@Override
public Object createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
BeamFnStateClient beamFnStateClient,
String pTransformId,
PTransform pTransform,
Supplier<String> processBundleInstructionId,
Map<String, PCollection> pCollections,
Map<String, Coder> coders,
Map<String, WindowingStrategy> windowingStrategies,
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
BundleSplitListener splitListener) {
String message =
String.format(
"No factory registered for %s, known factories %s",
pTransform.getSpec().getUrn(), knownUrns);
LOG.error(message);
throw new IllegalStateException(message);
}
}
}