| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.fn.harness.control; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.ServiceLoader; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.Phaser; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| import org.apache.beam.fn.harness.PTransformRunnerFactory; |
| import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar; |
| import org.apache.beam.fn.harness.data.BeamFnDataClient; |
| import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry; |
| import org.apache.beam.fn.harness.data.PTransformFunctionRegistry; |
| import org.apache.beam.fn.harness.data.QueueingBeamFnDataClient; |
| import org.apache.beam.fn.harness.state.BeamFnStateClient; |
| import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.Builder; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; |
| import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; |
| import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; |
| import org.apache.beam.model.pipeline.v1.RunnerApi; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Coder; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy; |
| import org.apache.beam.runners.core.construction.PTransformTranslation; |
| import org.apache.beam.runners.core.metrics.ExecutionStateSampler; |
| import org.apache.beam.runners.core.metrics.ExecutionStateTracker; |
| import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; |
| import org.apache.beam.sdk.function.ThrowingRunnable; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.util.common.ReflectHelpers; |
| import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Message; |
| import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.TextFormat; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashMultimap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.SetMultimap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Processes {@link BeamFnApi.ProcessBundleRequest}s by materializing the set of required runners |
| * for each {@link RunnerApi.FunctionSpec}, wiring them together based upon the {@code input} and |
| * {@code output} map definitions. |
| * |
| * <p>Finally executes the DAG based graph by starting all runners in reverse topological order, and |
| * finishing all runners in forward topological order. |
| */ |
| public class ProcessBundleHandler { |
| |
| // TODO: What should the initial set of URNs be? |
| private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1"; |
| public static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1"; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class); |
| private static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES; |
| |
| static { |
| Set<Registrar> pipelineRunnerRegistrars = |
| Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); |
| pipelineRunnerRegistrars.addAll( |
| Lists.newArrayList(ServiceLoader.load(Registrar.class, ReflectHelpers.findClassLoader()))); |
| |
| // Load all registered PTransform runner factories. |
| ImmutableMap.Builder<String, PTransformRunnerFactory> builder = ImmutableMap.builder(); |
| for (Registrar registrar : pipelineRunnerRegistrars) { |
| builder.putAll(registrar.getPTransformRunnerFactories()); |
| } |
| REGISTERED_RUNNER_FACTORIES = builder.build(); |
| } |
| |
| private final PipelineOptions options; |
| private final Function<String, Message> fnApiRegistry; |
| private final BeamFnDataClient beamFnDataClient; |
| private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache; |
| private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap; |
| private final PTransformRunnerFactory defaultPTransformRunnerFactory; |
| |
| public ProcessBundleHandler( |
| PipelineOptions options, |
| Function<String, Message> fnApiRegistry, |
| BeamFnDataClient beamFnDataClient, |
| BeamFnStateGrpcClientCache beamFnStateGrpcClientCache) { |
| this( |
| options, |
| fnApiRegistry, |
| beamFnDataClient, |
| beamFnStateGrpcClientCache, |
| REGISTERED_RUNNER_FACTORIES); |
| } |
| |
| @VisibleForTesting |
| ProcessBundleHandler( |
| PipelineOptions options, |
| Function<String, Message> fnApiRegistry, |
| BeamFnDataClient beamFnDataClient, |
| BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, |
| Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) { |
| this.options = options; |
| this.fnApiRegistry = fnApiRegistry; |
| this.beamFnDataClient = beamFnDataClient; |
| this.beamFnStateGrpcClientCache = beamFnStateGrpcClientCache; |
| this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap; |
| this.defaultPTransformRunnerFactory = |
| new UnknownPTransformRunnerFactory(urnToPTransformRunnerFactoryMap.keySet()); |
| } |
| |
| private void createRunnerAndConsumersForPTransformRecursively( |
| BeamFnStateClient beamFnStateClient, |
| BeamFnDataClient queueingClient, |
| String pTransformId, |
| PTransform pTransform, |
| Supplier<String> processBundleInstructionId, |
| ProcessBundleDescriptor processBundleDescriptor, |
| SetMultimap<String, String> pCollectionIdsToConsumingPTransforms, |
| PCollectionConsumerRegistry pCollectionConsumerRegistry, |
| Set<String> processedPTransformIds, |
| PTransformFunctionRegistry startFunctionRegistry, |
| PTransformFunctionRegistry finishFunctionRegistry, |
| BundleSplitListener splitListener) |
| throws IOException { |
| |
| // Recursively ensure that all consumers of the output PCollection have been created. |
| // Since we are creating the consumers first, we know that the we are building the DAG |
| // in reverse topological order. |
| for (String pCollectionId : pTransform.getOutputsMap().values()) { |
| |
| for (String consumingPTransformId : pCollectionIdsToConsumingPTransforms.get(pCollectionId)) { |
| createRunnerAndConsumersForPTransformRecursively( |
| beamFnStateClient, |
| queueingClient, |
| consumingPTransformId, |
| processBundleDescriptor.getTransformsMap().get(consumingPTransformId), |
| processBundleInstructionId, |
| processBundleDescriptor, |
| pCollectionIdsToConsumingPTransforms, |
| pCollectionConsumerRegistry, |
| processedPTransformIds, |
| startFunctionRegistry, |
| finishFunctionRegistry, |
| splitListener); |
| } |
| } |
| |
| if (!pTransform.hasSpec()) { |
| throw new IllegalArgumentException( |
| String.format( |
| "Cannot process transform with no spec: %s", TextFormat.printToString(pTransform))); |
| } |
| |
| if (pTransform.getSubtransformsCount() > 0) { |
| throw new IllegalArgumentException( |
| String.format( |
| "Cannot process composite transform: %s", TextFormat.printToString(pTransform))); |
| } |
| // Skip reprocessing processed pTransforms. |
| if (!processedPTransformIds.contains(pTransformId)) { |
| urnToPTransformRunnerFactoryMap |
| .getOrDefault(pTransform.getSpec().getUrn(), defaultPTransformRunnerFactory) |
| .createRunnerForPTransform( |
| options, |
| queueingClient, |
| beamFnStateClient, |
| pTransformId, |
| pTransform, |
| processBundleInstructionId, |
| processBundleDescriptor.getPcollectionsMap(), |
| processBundleDescriptor.getCodersMap(), |
| processBundleDescriptor.getWindowingStrategiesMap(), |
| pCollectionConsumerRegistry, |
| startFunctionRegistry, |
| finishFunctionRegistry, |
| splitListener); |
| processedPTransformIds.add(pTransformId); |
| } |
| } |
| |
| /** |
| * Processes a bundle, running the start(), process(), and finish() functions. This function is |
| * required to be reentrant. |
| */ |
| public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request) |
| throws Exception { |
| // Note: We must create one instance of the QueueingBeamFnDataClient as it is designed to |
| // handle the life of a bundle. It will insert elements onto a queue and drain them off so all |
| // process() calls will execute on this thread when queueingClient.drainAndBlock() is called. |
| QueueingBeamFnDataClient queueingClient = new QueueingBeamFnDataClient(this.beamFnDataClient); |
| |
| String bundleId = request.getProcessBundle().getProcessBundleDescriptorReference(); |
| BeamFnApi.ProcessBundleDescriptor bundleDescriptor = |
| (BeamFnApi.ProcessBundleDescriptor) fnApiRegistry.apply(bundleId); |
| |
| SetMultimap<String, String> pCollectionIdsToConsumingPTransforms = HashMultimap.create(); |
| MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap(); |
| ExecutionStateTracker stateTracker = |
| new ExecutionStateTracker(ExecutionStateSampler.instance()); |
| PCollectionConsumerRegistry pCollectionConsumerRegistry = |
| new PCollectionConsumerRegistry(metricsContainerRegistry, stateTracker); |
| HashSet<String> processedPTransformIds = new HashSet<>(); |
| |
| PTransformFunctionRegistry startFunctionRegistry = |
| new PTransformFunctionRegistry( |
| metricsContainerRegistry, stateTracker, ExecutionStateTracker.START_STATE_NAME); |
| PTransformFunctionRegistry finishFunctionRegistry = |
| new PTransformFunctionRegistry( |
| metricsContainerRegistry, stateTracker, ExecutionStateTracker.FINISH_STATE_NAME); |
| |
| // Build a multimap of PCollection ids to PTransform ids which consume said PCollections |
| for (Map.Entry<String, RunnerApi.PTransform> entry : |
| bundleDescriptor.getTransformsMap().entrySet()) { |
| for (String pCollectionId : entry.getValue().getInputsMap().values()) { |
| pCollectionIdsToConsumingPTransforms.put(pCollectionId, entry.getKey()); |
| } |
| } |
| |
| ProcessBundleResponse.Builder response = ProcessBundleResponse.newBuilder(); |
| |
| // Instantiate a State API call handler depending on whether a State Api service descriptor |
| // was specified. |
| try (HandleStateCallsForBundle beamFnStateClient = |
| bundleDescriptor.hasStateApiServiceDescriptor() |
| ? new BlockTillStateCallsFinish( |
| beamFnStateGrpcClientCache.forApiServiceDescriptor( |
| bundleDescriptor.getStateApiServiceDescriptor())) |
| : new FailAllStateCallsForBundle(request.getProcessBundle())) { |
| Multimap<String, BundleApplication> allPrimaries = ArrayListMultimap.create(); |
| Multimap<String, DelayedBundleApplication> allResiduals = ArrayListMultimap.create(); |
| BundleSplitListener splitListener = |
| (List<BundleApplication> primaries, List<DelayedBundleApplication> residuals) -> { |
| // Reset primaries and accumulate residuals. |
| Multimap<String, BundleApplication> newPrimaries = ArrayListMultimap.create(); |
| for (BundleApplication primary : primaries) { |
| newPrimaries.put(primary.getPtransformId(), primary); |
| } |
| allPrimaries.clear(); |
| allPrimaries.putAll(newPrimaries); |
| |
| for (DelayedBundleApplication residual : residuals) { |
| allResiduals.put(residual.getApplication().getPtransformId(), residual); |
| } |
| }; |
| |
| // Create a BeamFnStateClient |
| for (Map.Entry<String, RunnerApi.PTransform> entry : |
| bundleDescriptor.getTransformsMap().entrySet()) { |
| |
| // Skip anything which isn't a root |
| // TODO: Remove source as a root and have it be triggered by the Runner. |
| if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn()) |
| && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn()) |
| && !PTransformTranslation.READ_TRANSFORM_URN.equals( |
| entry.getValue().getSpec().getUrn())) { |
| continue; |
| } |
| |
| createRunnerAndConsumersForPTransformRecursively( |
| beamFnStateClient, |
| queueingClient, |
| entry.getKey(), |
| entry.getValue(), |
| request::getInstructionId, |
| bundleDescriptor, |
| pCollectionIdsToConsumingPTransforms, |
| pCollectionConsumerRegistry, |
| processedPTransformIds, |
| startFunctionRegistry, |
| finishFunctionRegistry, |
| splitListener); |
| } |
| |
| try (Closeable closeTracker = stateTracker.activate()) { |
| // Already in reverse topological order so we don't need to do anything. |
| for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) { |
| LOG.debug("Starting function {}", startFunction); |
| startFunction.run(); |
| } |
| |
| queueingClient.drainAndBlock(); |
| |
| // Need to reverse this since we want to call finish in topological order. |
| for (ThrowingRunnable finishFunction : |
| Lists.reverse(finishFunctionRegistry.getFunctions())) { |
| LOG.debug("Finishing function {}", finishFunction); |
| finishFunction.run(); |
| } |
| if (!allResiduals.isEmpty()) { |
| response.addAllResidualRoots(allResiduals.values()); |
| } |
| } |
| // Get start bundle Execution Time Metrics. |
| for (MonitoringInfo mi : startFunctionRegistry.getExecutionTimeMonitoringInfos()) { |
| response.addMonitoringInfos(mi); |
| } |
| // Get process bundle Execution Time Metrics. |
| for (MonitoringInfo mi : pCollectionConsumerRegistry.getExecutionTimeMonitoringInfos()) { |
| response.addMonitoringInfos(mi); |
| } |
| |
| // Get finish bundle Execution Time Metrics. |
| for (MonitoringInfo mi : finishFunctionRegistry.getExecutionTimeMonitoringInfos()) { |
| response.addMonitoringInfos(mi); |
| } |
| // Extract all other MonitoringInfos other than the execution time monitoring infos. |
| for (MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) { |
| response.addMonitoringInfos(mi); |
| } |
| } |
| return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response); |
| } |
| |
| /** |
| * A {@link BeamFnStateClient} which counts the number of outstanding {@link StateRequest}s and |
| * blocks till they are all finished. |
| */ |
| private static class BlockTillStateCallsFinish extends HandleStateCallsForBundle { |
| private final BeamFnStateClient beamFnStateClient; |
| private final Phaser phaser; |
| private int currentPhase; |
| |
| private BlockTillStateCallsFinish(BeamFnStateClient beamFnStateClient) { |
| this.beamFnStateClient = beamFnStateClient; |
| this.phaser = new Phaser(1 /* initial party is the process bundle handler */); |
| this.currentPhase = phaser.getPhase(); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| int unarrivedParties = phaser.getUnarrivedParties(); |
| if (unarrivedParties > 0) { |
| LOG.debug( |
| "Waiting for {} parties to arrive before closing, current phase {}.", |
| unarrivedParties, |
| currentPhase); |
| } |
| currentPhase = phaser.arriveAndAwaitAdvance(); |
| } |
| |
| @Override |
| @SuppressWarnings("FutureReturnValueIgnored") // async arriveAndDeregister task doesn't need |
| // monitoring. |
| public void handle( |
| StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) { |
| // Register each request with the phaser and arrive and deregister each time a request |
| // completes. |
| phaser.register(); |
| response.whenComplete((stateResponse, throwable) -> phaser.arriveAndDeregister()); |
| beamFnStateClient.handle(requestBuilder, response); |
| } |
| } |
| |
| /** |
| * A {@link BeamFnStateClient} which fails all requests because the {@link ProcessBundleRequest} |
| * does not contain a State API {@link ApiServiceDescriptor}. |
| */ |
| private static class FailAllStateCallsForBundle extends HandleStateCallsForBundle { |
| private final ProcessBundleRequest request; |
| |
| private FailAllStateCallsForBundle(ProcessBundleRequest request) { |
| this.request = request; |
| } |
| |
| @Override |
| public void close() throws Exception { |
| // no-op |
| } |
| |
| @Override |
| public void handle(Builder requestBuilder, CompletableFuture<StateResponse> response) { |
| throw new IllegalStateException( |
| String.format( |
| "State API calls are unsupported because the " |
| + "ProcessBundleRequest %s does not support state.", |
| request)); |
| } |
| } |
| |
| private abstract static class HandleStateCallsForBundle |
| implements AutoCloseable, BeamFnStateClient {} |
| |
| private static class UnknownPTransformRunnerFactory implements PTransformRunnerFactory<Object> { |
| private final Set<String> knownUrns; |
| |
| private UnknownPTransformRunnerFactory(Set<String> knownUrns) { |
| this.knownUrns = knownUrns; |
| } |
| |
| @Override |
| public Object createRunnerForPTransform( |
| PipelineOptions pipelineOptions, |
| BeamFnDataClient beamFnDataClient, |
| BeamFnStateClient beamFnStateClient, |
| String pTransformId, |
| PTransform pTransform, |
| Supplier<String> processBundleInstructionId, |
| Map<String, PCollection> pCollections, |
| Map<String, Coder> coders, |
| Map<String, WindowingStrategy> windowingStrategies, |
| PCollectionConsumerRegistry pCollectionConsumerRegistry, |
| PTransformFunctionRegistry startFunctionRegistry, |
| PTransformFunctionRegistry finishFunctionRegistry, |
| BundleSplitListener splitListener) { |
| String message = |
| String.format( |
| "No factory registered for %s, known factories %s", |
| pTransform.getSpec().getUrn(), knownUrns); |
| LOG.error(message); |
| throw new IllegalStateException(message); |
| } |
| } |
| } |