| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.core.construction.graph; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| |
| import java.util.Collection; |
| import java.util.Map; |
| import java.util.Optional; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; |
| import org.apache.beam.runners.core.construction.PTransformTranslation; |
| import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; |
| import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; |
| import org.apache.beam.sdk.transforms.Flatten; |
| import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A Fuser that constructs a fused pipeline by fusing as many PCollections into a stage as possible. |
| */ |
| class GreedyPCollectionFusers { |
| private static final Logger LOG = LoggerFactory.getLogger(GreedyPCollectionFusers.class); |
| |
| private static final Map<String, FusibilityChecker> URN_FUSIBILITY_CHECKERS = |
| ImmutableMap.<String, FusibilityChecker>builder() |
| .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, GreedyPCollectionFusers::canFuseParDo) |
| .put( |
| PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN, |
| GreedyPCollectionFusers::canFuseParDo) |
| .put( |
| PTransformTranslation.SPLITTABLE_SPLIT_RESTRICTION_URN, |
| GreedyPCollectionFusers::canFuseParDo) |
| .put( |
| PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN, |
| GreedyPCollectionFusers::cannotFuse) |
| .put( |
| PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN, |
| GreedyPCollectionFusers::cannotFuse) |
| .put( |
| PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN, |
| GreedyPCollectionFusers::canFuseParDo) |
| .put( |
| PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN, |
| GreedyPCollectionFusers::cannotFuse) |
| .put( |
| PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN, |
| GreedyPCollectionFusers::canFuseCompatibleEnvironment) |
| .put( |
| PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN, |
| GreedyPCollectionFusers::canFuseCompatibleEnvironment) |
| .put( |
| PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN, |
| GreedyPCollectionFusers::canFuseCompatibleEnvironment) |
| .put( |
| PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, |
| GreedyPCollectionFusers::canFuseCompatibleEnvironment) |
| .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, GreedyPCollectionFusers::canAlwaysFuse) |
| .put( |
| // GroupByKeys are runner-implemented only. PCollections consumed by a GroupByKey must |
| // be materialized |
| PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, GreedyPCollectionFusers::cannotFuse) |
| .put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, GreedyPCollectionFusers::cannotFuse) |
| .build(); |
| private static final FusibilityChecker DEFAULT_FUSIBILITY_CHECKER = |
| GreedyPCollectionFusers::unknownTransformFusion; |
| |
| // TODO: Migrate |
| private static final Map<String, CompatibilityChecker> URN_COMPATIBILITY_CHECKERS = |
| ImmutableMap.<String, CompatibilityChecker>builder() |
| .put( |
| PTransformTranslation.PAR_DO_TRANSFORM_URN, |
| GreedyPCollectionFusers::parDoCompatibility) |
| .put( |
| PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN, |
| GreedyPCollectionFusers::parDoCompatibility) |
| .put( |
| PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN, |
| GreedyPCollectionFusers::parDoCompatibility) |
| .put( |
| PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN, |
| GreedyPCollectionFusers::parDoCompatibility) |
| .put( |
| PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN, |
| GreedyPCollectionFusers::compatibleEnvironments) |
| .put( |
| PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN, |
| GreedyPCollectionFusers::compatibleEnvironments) |
| .put( |
| PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN, |
| GreedyPCollectionFusers::compatibleEnvironments) |
| .put( |
| PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, |
| GreedyPCollectionFusers::compatibleEnvironments) |
| .put( |
| PTransformTranslation.FLATTEN_TRANSFORM_URN, GreedyPCollectionFusers::noCompatibility) |
| .put( |
| PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, |
| GreedyPCollectionFusers::noCompatibility) |
| .put( |
| PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, |
| GreedyPCollectionFusers::noCompatibility) |
| .build(); |
| private static final CompatibilityChecker DEFAULT_COMPATIBILITY_CHECKER = |
| GreedyPCollectionFusers::unknownTransformCompatibility; |
| |
| /** Returns true if the PTransform node for the given input PCollection can be fused across. */ |
| public static boolean canFuse( |
| PTransformNode transformNode, |
| Environment environment, |
| PCollectionNode candidate, |
| Collection<PCollectionNode> stagePCollections, |
| QueryablePipeline pipeline) { |
| return URN_FUSIBILITY_CHECKERS |
| .getOrDefault(transformNode.getTransform().getSpec().getUrn(), DEFAULT_FUSIBILITY_CHECKER) |
| .canFuse(transformNode, environment, candidate, stagePCollections, pipeline); |
| } |
| |
| /** |
| * Returns true if the two PTransforms are compatible such that they can be executed in the same |
| * environment. |
| */ |
| public static boolean isCompatible( |
| PTransformNode left, PTransformNode right, QueryablePipeline pipeline) { |
| CompatibilityChecker leftChecker = |
| URN_COMPATIBILITY_CHECKERS.getOrDefault( |
| left.getTransform().getSpec().getUrn(), DEFAULT_COMPATIBILITY_CHECKER); |
| CompatibilityChecker rightChecker = |
| URN_COMPATIBILITY_CHECKERS.getOrDefault( |
| right.getTransform().getSpec().getUrn(), DEFAULT_COMPATIBILITY_CHECKER); |
| // The nodes are mutually compatible |
| return leftChecker.isCompatible(left, right, pipeline) |
| && rightChecker.isCompatible(right, left, pipeline); |
| } |
| |
| // For the following methods, these should be called if the ptransform is consumed by a |
| // PCollection output by the ExecutableStage, to determine if it can be fused into that |
| // Subgraph |
| |
| private interface FusibilityChecker { |
| /** |
| * Determine if a {@link PTransformNode} can be fused into an existing {@link ExecutableStage}. |
| */ |
| boolean canFuse( |
| PTransformNode transformNode, |
| Environment environment, |
| @SuppressWarnings("unused") PCollectionNode candidate, |
| Collection<PCollectionNode> stagePCollections, |
| QueryablePipeline pipeline); |
| } |
| |
| private interface CompatibilityChecker { |
| /** |
| * Determine if two {@link PTransformNode PTransforms} can be fused into a new stage. This |
| * determines sibling fusion for new {@link ExecutableStage stages}. |
| */ |
| boolean isCompatible( |
| PTransformNode newNode, PTransformNode otherNode, QueryablePipeline pipeline); |
| } |
| |
| /** |
| * A ParDo can be fused into a stage if it executes in the same Environment as that stage, and no |
| * transform that are upstream of any of its side input are present in that stage. |
| * |
| * <p>A ParDo that consumes a side input cannot process an element until all of the side inputs |
| * contain data for the side input window that contains the element. |
| */ |
| private static boolean canFuseParDo( |
| PTransformNode parDo, |
| Environment environment, |
| PCollectionNode candidate, |
| Collection<PCollectionNode> stagePCollections, |
| QueryablePipeline pipeline) { |
| Optional<Environment> env = pipeline.getEnvironment(parDo); |
| checkArgument( |
| env.isPresent(), |
| "A %s must have an %s associated with it", |
| ParDoPayload.class.getSimpleName(), |
| Environment.class.getSimpleName()); |
| if (!env.get().equals(environment)) { |
| // The PCollection's producer and this ParDo execute in different environments, so fusion |
| // is never possible. |
| return false; |
| } |
| try { |
| ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); |
| if (Maps.filterKeys( |
| parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s)) |
| .values() |
| .contains(candidate.getId())) { |
| // Allow fusion across timer PCollections because they are a self loop. |
| return true; |
| } else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { |
| // Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for |
| // a key must execute serially. To avoid checking if the rest of the stage is |
| // key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. |
| return false; |
| } else if (!pipeline.getSideInputs(parDo).isEmpty()) { |
| // At execution time, a Runner is required to only provide inputs to a PTransform that, at |
| // the time the PTransform processes them, the associated window is ready in all side inputs |
| // that the PTransform consumes. For an arbitrary stage, it is significantly complex for the |
| // runner to determine this for each input. As a result, we break fusion to simplify this |
| // inspection. In general, a ParDo which consumes side inputs cannot be fused into an |
| // executable stage alongside any transforms which are upstream of any of its side inputs. |
| return false; |
| } |
| } catch (InvalidProtocolBufferException e) { |
| throw new IllegalArgumentException(e); |
| } |
| return true; |
| } |
| |
| private static boolean parDoCompatibility( |
| PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) { |
| // Implicitly true if we are attempting to fuse against oneself. This case comes up for |
| // PCollections representing timers since they create a self-loop in the graph. |
| return parDo.equals(other) |
| // This is a convenience rather than a strict requirement. In general, a ParDo that consumes |
| // side inputs can be fused with other transforms in the same environment which are not |
| // upstream of any of the side inputs. |
| || (pipeline.getSideInputs(parDo).isEmpty() |
| // We purposefully break fusion here to provide runners the opportunity to insert a |
| // grouping operation to simplify implementing support for ParDo's that contain user |
| // state. |
| // We would not need to do this if we had the ability to mark upstream transforms as |
| // key preserving or if runners could execute ParDos containing user state in a |
| // distributed |
| // fashion for a single key. |
| && pipeline.getUserStates(parDo).isEmpty() |
| // We purposefully break fusion here to provide runners the opportunity to insert a |
| // grouping operation to simplify implementing support for ParDo's that contain timers. |
| // We would not need to do this if we had the ability to mark upstream transforms as |
| // key preserving or if runners could execute ParDos containing timers in a distributed |
| // fashion for a single key. |
| && pipeline.getTimers(parDo).isEmpty() |
| && compatibleEnvironments(parDo, other, pipeline)); |
| } |
| |
| /** |
| * A WindowInto can be fused into a stage if it executes in the same Environment as that stage. |
| */ |
| private static boolean canFuseCompatibleEnvironment( |
| PTransformNode operation, |
| Environment environment, |
| @SuppressWarnings("unused") PCollectionNode candidate, |
| @SuppressWarnings("unused") Collection<PCollectionNode> stagePCollections, |
| QueryablePipeline pipeline) { |
| // WindowInto transforms may not have an environment |
| Optional<Environment> operationEnvironment = pipeline.getEnvironment(operation); |
| return environment.equals(operationEnvironment.orElse(null)); |
| } |
| |
| private static boolean compatibleEnvironments( |
| PTransformNode left, PTransformNode right, QueryablePipeline pipeline) { |
| return pipeline.getEnvironment(left).equals(pipeline.getEnvironment(right)); |
| } |
| |
| /** |
| * Flatten can be fused into any stage. |
| * |
| * <p>If the assumption that for each {@link PCollection}, an element is produced in that {@link |
| * PCollection} via a single path through the {@link Pipeline} DAG, a Flatten can appear in each |
| * stage that produces any of its input {@link PCollection PCollections}, as all of its inputs |
| * will reach it via only one of those paths. |
| * |
| * <p>As flatten consumes multiple inputs and produces a single output, there are two cases that |
| * must be considered for the inputs. |
| * |
| * <ul> |
| * <li>All of the producers of all inputs are within a single stage |
| * <li>The producers of the inputs are in two or more stages |
| * </ul> |
| * |
| * <p>If all of the producers exist within a single stage, this is identical to any other |
| * transform that consumes a single input - the output PCollection is materialized based only on |
| * its consumers. |
| * |
| * <p>If the producers exist within separate stages, there are two other considerations: |
| * |
| * <ul> |
| * <li>The output PCollection must be materialized in all cases (has consumers which cannot be |
| * fused into at least one of the upstream stages). |
| * <li>All of the consumers of the output PCollection can be fused into at least one of the |
| * producing stages. |
| * </ul> |
| * |
| * <p>For the former case, this again is identical to a transform that produces a materialized |
| * {@link PCollection}; each path to the {@link Flatten} produces elements for the input {@link |
| * PCollection}, and the output is materialized and consumed by downstream transforms identically |
| * to any other materialized {@link PCollection}. |
| * |
| * <p>For the latter, where fusion is possible into at least one of the producer stages, Flatten |
| * unzipping is performed. This consists of the following steps: |
| * |
| * <ol> |
| * <li>The flatten is fused into each upstream stage |
| * <li>Each stage which contains a producer that can be fused with the output {@link |
| * PCollection} fuses that {@link PCollection}. Elements produced by that stage for the |
| * output of the flatten are never materialized. |
| * <li>Each stage which cannot be fused with the output {@link PCollection} materializes the |
| * output of the {@link Flatten}. All of the downstream consumers exist in a stage which |
| * reads from the output of that {@link Flatten}, which contains all of the elements from |
| * the stages that could not fuse with those consumers. |
| * </ol> |
| */ |
| private static boolean canAlwaysFuse( |
| @SuppressWarnings("unused") PTransformNode flatten, |
| @SuppressWarnings("unused") Environment environment, |
| @SuppressWarnings("unused") PCollectionNode candidate, |
| @SuppressWarnings("unused") Collection<PCollectionNode> stagePCollections, |
| @SuppressWarnings("unused") QueryablePipeline pipeline) { |
| return true; |
| } |
| |
| private static boolean cannotFuse( |
| @SuppressWarnings("unused") PTransformNode cannotFuse, |
| @SuppressWarnings("unused") Environment environment, |
| @SuppressWarnings("unused") PCollectionNode candidate, |
| @SuppressWarnings("unused") Collection<PCollectionNode> stagePCollections, |
| @SuppressWarnings("unused") QueryablePipeline pipeline) { |
| return false; |
| } |
| |
| private static boolean noCompatibility( |
| @SuppressWarnings("unused") PTransformNode self, |
| @SuppressWarnings("unused") PTransformNode other, |
| @SuppressWarnings("unused") QueryablePipeline pipeline) { |
| // TODO: There is performance to be gained if the output of a flatten is fused into a stage |
| // where its output is wholly consumed after a fusion break. This requires slightly more |
| // lookahead. |
| return false; |
| } |
| |
| // Things with unknown URNs either execute within their own stage or are executed by the runner. |
| // In either case, assume the system is capable of executing the expressed transform |
| private static boolean unknownTransformFusion( |
| PTransformNode transform, |
| @SuppressWarnings("unused") Environment environment, |
| @SuppressWarnings("unused") PCollectionNode candidate, |
| @SuppressWarnings("unused") Collection<PCollectionNode> stagePCollections, |
| @SuppressWarnings("unused") QueryablePipeline pipeline) { |
| LOG.debug( |
| "Unknown {} {} will not fuse into an existing {}", |
| PTransform.class.getSimpleName(), |
| transform.getTransform(), |
| ExecutableStage.class.getSimpleName(), |
| PTransform.class.getSimpleName()); |
| return false; |
| } |
| |
| // Things with unknown URNs either execute within their own stage or are executed by the runner. |
| // In either case, assume the system is capable of executing the expressed transform |
| private static boolean unknownTransformCompatibility( |
| PTransformNode transform, |
| @SuppressWarnings("unused") PTransformNode other, |
| @SuppressWarnings("unused") QueryablePipeline pipeline) { |
| LOG.debug( |
| "Unknown {} {} will not root a {} with other {}", |
| PTransform.class.getSimpleName(), |
| transform.getTransform(), |
| ExecutableStage.class.getSimpleName(), |
| PTransform.class.getSimpleName()); |
| return false; |
| } |
| } |