| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.core.construction.graph; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Components; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.ComponentsOrBuilder; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.runners.PTransformOverride; |
| |
| /** |
| * A way to apply a Proto-based {@link PTransformOverride}. |
| * |
| * <p>This should generally be used to replace runner-executed transforms with runner-executed |
| * composites and simpler runner-executed primitives. It is generically less powerful than the |
| * native {@link org.apache.beam.sdk.Pipeline#replaceAll(List)} and more error-prone, so should only |
| * be used for relatively simple replacements. |
| */ |
| @Experimental |
| public class ProtoOverrides { |
| /** |
| * Update all composites present in the {@code originalPipeline} with an URN equal to the provided |
| * {@code urn} using the provided {@link TransformReplacement}. |
| */ |
| public static Pipeline updateTransform( |
| String urn, Pipeline originalPipeline, TransformReplacement compositeBuilder) { |
| Components.Builder resultComponents = originalPipeline.getComponents().toBuilder(); |
| for (Map.Entry<String, PTransform> pt : |
| originalPipeline.getComponents().getTransformsMap().entrySet()) { |
| if (pt.getValue().getSpec() != null && urn.equals(pt.getValue().getSpec().getUrn())) { |
| MessageWithComponents updated = |
| compositeBuilder.getReplacement(pt.getKey(), originalPipeline.getComponents()); |
| checkArgument( |
| updated.getPtransform().getOutputsMap().equals(pt.getValue().getOutputsMap()), |
| "A %s must produce all of the outputs of the original %s", |
| TransformReplacement.class.getSimpleName(), |
| PTransform.class.getSimpleName()); |
| removeSubtransforms(pt.getValue(), resultComponents); |
| resultComponents |
| .mergeFrom(updated.getComponents()) |
| .putTransforms(pt.getKey(), updated.getPtransform()); |
| } |
| } |
| return originalPipeline.toBuilder().setComponents(resultComponents).build(); |
| } |
| |
| /** |
| * Remove all subtransforms of the provided transform recursively.A {@link PTransform} can be the |
| * subtransform of only one enclosing transform. |
| */ |
| private static void removeSubtransforms(PTransform pt, Components.Builder target) { |
| for (String subtransformId : pt.getSubtransformsList()) { |
| PTransform subtransform = target.getTransformsOrThrow(subtransformId); |
| removeSubtransforms(subtransform, target); |
| target.removeTransforms(subtransformId); |
| // TODO: remove PCollections not produced by 'pt' here. |
| } |
| } |
| |
| /** |
| * A Function that takes a transform and the existing components and returns the new composite |
| * PTransform and additional components. |
| */ |
| @FunctionalInterface |
| public interface TransformReplacement { |
| /** |
| * Returns the updated composite structure for the provided {@link PTransform}. |
| * |
| * <p>The returned {@link MessageWithComponents} must contain a single {@link PTransform}. The |
| * result {@link Components} will be merged into the existing components, and the result {@link |
| * PTransform} will be set as a replacement of the original {@link PTransform}. Notably, this |
| * does not require that the {@code existingComponents} are present in the returned {@link |
| * MessageWithComponents}. |
| * |
| * <p>Introduced components must not collide with any components in the existing components. |
| */ |
| MessageWithComponents getReplacement( |
| String transformId, ComponentsOrBuilder existingComponents); |
| } |
| } |