| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.core.construction.graph; |
| |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.CREATE_VIEW_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.FLATTEN_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.IMPULSE_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.MAP_WINDOWS_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.READ_TRANSFORM_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN; |
| import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| |
| import java.util.ArrayDeque; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Deque; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.StreamSupport; |
| import org.apache.beam.model.pipeline.v1.RunnerApi; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Components; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; |
| import org.apache.beam.runners.core.construction.Environments; |
| import org.apache.beam.runners.core.construction.NativeTransforms; |
| import org.apache.beam.runners.core.construction.PTransformTranslation; |
| import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; |
| import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; |
| import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.graph.MutableNetwork; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.graph.Network; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.graph.NetworkBuilder; |
| |
| /** |
| * A {@link Pipeline} which has additional methods to relate nodes in the graph relative to each |
| * other. |
| */ |
| public class QueryablePipeline { |
| // TODO: Is it better to have the signatures here require nodes in almost all contexts, or should |
| // they all take strings? Nodes gives some degree of type signalling that names might not, but |
| // it's more painful to construct the node. However, right now the traversal is done starting |
| // at the roots and using nodes everywhere based on what any query has returned. |
| /** |
| * Create a new {@link QueryablePipeline} based on the provided components. |
| * |
| * <p>The returned {@link QueryablePipeline} will contain only the primitive transforms present |
| * within the provided components. |
| */ |
| public static QueryablePipeline forPrimitivesIn(Components components) { |
| return new QueryablePipeline(getPrimitiveTransformIds(components), components); |
| } |
| |
| /** |
| * Create a new {@link QueryablePipeline} which uses the root transform IDs and components of the |
| * provided {@link Pipeline}. |
| */ |
| public static QueryablePipeline forPipeline(RunnerApi.Pipeline p) { |
| return forTransforms(p.getRootTransformIdsList(), p.getComponents()); |
| } |
| |
| /** |
| * Create a new {@link QueryablePipeline} based on the provided components containing only the |
| * provided {@code transformIds}. |
| */ |
| public static QueryablePipeline forTransforms( |
| Collection<String> transformIds, Components components) { |
| return new QueryablePipeline(transformIds, components); |
| } |
| |
| private final Components components; |
| |
| /** |
| * The {@link Pipeline} represented by a {@link Network}. |
| * |
| * <p>This is a directed bipartite graph consisting of {@link PTransformNode PTransformNodes} and |
| * {@link PCollectionNode PCollectionNodes}. Each {@link PCollectionNode} has exactly one in edge, |
| * and an arbitrary number of out edges. Each {@link PTransformNode} has an arbitrary number of in |
| * and out edges. |
| * |
| * <p>Parallel edges are permitted, as a {@link PCollectionNode} can be consumed by a single |
| * {@link PTransformNode} any number of times with different local names. |
| */ |
| private final Network<PipelineNode, PipelineEdge> pipelineNetwork; |
| |
| private QueryablePipeline(Collection<String> transformIds, Components components) { |
| this.components = components; |
| this.pipelineNetwork = buildNetwork(transformIds, this.components); |
| } |
| |
| /** Produces a {@link RunnerApi.Components} which contains only primitive transforms. */ |
| @VisibleForTesting |
| static Collection<String> getPrimitiveTransformIds(RunnerApi.Components components) { |
| Collection<String> ids = new LinkedHashSet<>(); |
| |
| for (Map.Entry<String, PTransform> transformEntry : components.getTransformsMap().entrySet()) { |
| PTransform transform = transformEntry.getValue(); |
| boolean isPrimitive = isPrimitiveTransform(transform); |
| if (isPrimitive) { |
| // Sometimes "primitive" transforms have sub-transforms (and even deeper-nested |
| // descendents), due to runners |
| // either rewriting them in terms of runner-specific transforms, or SDKs constructing them |
| // in terms of other |
| // underlying transforms (see https://issues.apache.org/jira/browse/BEAM-5441). |
| // We consider any "leaf" descendents of these "primitive" transforms to be the true |
| // "primitives" that we |
| // preserve here; in the common case, this is just the "primitive" itself, which has no |
| // descendents). |
| Deque<String> transforms = new ArrayDeque<>(); |
| transforms.push(transformEntry.getKey()); |
| while (!transforms.isEmpty()) { |
| String id = transforms.pop(); |
| PTransform next = components.getTransformsMap().get(id); |
| List<String> subtransforms = next.getSubtransformsList(); |
| if (subtransforms.isEmpty()) { |
| ids.add(id); |
| } else { |
| transforms.addAll(subtransforms); |
| } |
| } |
| } |
| } |
| return ids; |
| } |
| |
| private static final Set<String> PRIMITIVE_URNS = |
| ImmutableSet.of( |
| PAR_DO_TRANSFORM_URN, |
| FLATTEN_TRANSFORM_URN, |
| GROUP_BY_KEY_TRANSFORM_URN, |
| IMPULSE_TRANSFORM_URN, |
| ASSIGN_WINDOWS_TRANSFORM_URN, |
| TEST_STREAM_TRANSFORM_URN, |
| MAP_WINDOWS_TRANSFORM_URN, |
| READ_TRANSFORM_URN, |
| CREATE_VIEW_TRANSFORM_URN, |
| COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN, |
| COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN, |
| COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN, |
| SPLITTABLE_PROCESS_KEYED_URN, |
| SPLITTABLE_PROCESS_ELEMENTS_URN, |
| SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN, |
| SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN); |
| |
| /** Returns true if the provided transform is a primitive. */ |
| private static boolean isPrimitiveTransform(PTransform transform) { |
| String urn = PTransformTranslation.urnForTransformOrNull(transform); |
| return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform); |
| } |
| |
| private MutableNetwork<PipelineNode, PipelineEdge> buildNetwork( |
| Collection<String> transformIds, Components components) { |
| MutableNetwork<PipelineNode, PipelineEdge> network = |
| NetworkBuilder.directed().allowsParallelEdges(true).allowsSelfLoops(false).build(); |
| Set<PCollectionNode> unproducedCollections = new HashSet<>(); |
| for (String transformId : transformIds) { |
| PTransform transform = components.getTransformsOrThrow(transformId); |
| PTransformNode transformNode = |
| PipelineNode.pTransform(transformId, this.components.getTransformsOrThrow(transformId)); |
| network.addNode(transformNode); |
| for (String produced : transform.getOutputsMap().values()) { |
| PCollectionNode producedNode = |
| PipelineNode.pCollection(produced, components.getPcollectionsOrThrow(produced)); |
| network.addNode(producedNode); |
| network.addEdge(transformNode, producedNode, new PerElementEdge()); |
| checkArgument( |
| network.inDegree(producedNode) == 1, |
| "A %s should have exactly one producing %s, but found %s:\nPCollection:\n%s\nProducers:\n%s", |
| PCollectionNode.class.getSimpleName(), |
| PTransformNode.class.getSimpleName(), |
| network.predecessors(producedNode).size(), |
| producedNode, |
| network.predecessors(producedNode)); |
| unproducedCollections.remove(producedNode); |
| } |
| for (Map.Entry<String, String> consumed : transform.getInputsMap().entrySet()) { |
| // This loop may add an edge between the consumed PCollection and the current PTransform. |
| // The local name of the transform must be used to determine the type of edge. |
| String pcollectionId = consumed.getValue(); |
| PCollectionNode consumedNode = |
| PipelineNode.pCollection( |
| pcollectionId, this.components.getPcollectionsOrThrow(pcollectionId)); |
| if (network.addNode(consumedNode)) { |
| // This node has been added to the network for the first time, so it has no producer. |
| unproducedCollections.add(consumedNode); |
| } |
| if (getLocalSideInputNames(transform).contains(consumed.getKey())) { |
| network.addEdge(consumedNode, transformNode, new SingletonEdge()); |
| } else { |
| network.addEdge(consumedNode, transformNode, new PerElementEdge()); |
| } |
| } |
| } |
| checkArgument( |
| unproducedCollections.isEmpty(), |
| "%ss %s were consumed but never produced", |
| PCollectionNode.class.getSimpleName(), |
| unproducedCollections); |
| return network; |
| } |
| |
| public Collection<PTransformNode> getTransforms() { |
| return pipelineNetwork.nodes().stream() |
| .filter(PTransformNode.class::isInstance) |
| .map(PTransformNode.class::cast) |
| .collect(Collectors.toList()); |
| } |
| |
| public Iterable<PTransformNode> getTopologicallyOrderedTransforms() { |
| return StreamSupport.stream( |
| Networks.topologicalOrder(pipelineNetwork, Comparator.comparing(PipelineNode::getId)) |
| .spliterator(), |
| false) |
| .filter(PTransformNode.class::isInstance) |
| .map(PTransformNode.class::cast) |
| .collect(Collectors.toList()); |
| } |
| |
| /** |
| * Get the transforms that are roots of this {@link QueryablePipeline}. These are all nodes which |
| * have no input {@link PCollection}. |
| */ |
| public Set<PTransformNode> getRootTransforms() { |
| return pipelineNetwork.nodes().stream() |
| .filter(pipelineNode -> pipelineNetwork.inEdges(pipelineNode).isEmpty()) |
| .map(pipelineNode -> (PTransformNode) pipelineNode) |
| .collect(Collectors.toSet()); |
| } |
| |
| public PTransformNode getProducer(PCollectionNode pcollection) { |
| return (PTransformNode) Iterables.getOnlyElement(pipelineNetwork.predecessors(pcollection)); |
| } |
| |
| /** |
| * Get all of the {@link PTransformNode PTransforms} which consume the provided {@link |
| * PCollectionNode} on a per-element basis. |
| * |
| * <p>If a {@link PTransformNode} consumes a {@link PCollectionNode} on a per-element basis one or |
| * more times, it will appear a single time in the result. |
| * |
| * <p>In theory, a transform may consume a single {@link PCollectionNode} in both a per-element |
| * and singleton manner. If this is the case, the transform node is included in the result, as it |
| * does consume the {@link PCollectionNode} on a per-element basis. |
| */ |
| public Set<PTransformNode> getPerElementConsumers(PCollectionNode pCollection) { |
| return pipelineNetwork.successors(pCollection).stream() |
| .filter( |
| consumer -> |
| pipelineNetwork.edgesConnecting(pCollection, consumer).stream() |
| .anyMatch(PipelineEdge::isPerElement)) |
| .map(pipelineNode -> (PTransformNode) pipelineNode) |
| .collect(Collectors.toSet()); |
| } |
| |
| /** |
| * Same as {@link #getPerElementConsumers(PCollectionNode)}, but returns transforms that consume |
| * the collection as a singleton. |
| */ |
| public Set<PTransformNode> getSingletonConsumers(PCollectionNode pCollection) { |
| return pipelineNetwork.successors(pCollection).stream() |
| .filter( |
| consumer -> |
| pipelineNetwork.edgesConnecting(pCollection, consumer).stream() |
| .anyMatch(edge -> !edge.isPerElement())) |
| .map(pipelineNode -> (PTransformNode) pipelineNode) |
| .collect(Collectors.toSet()); |
| } |
| |
| /** |
| * Gets each {@link PCollectionNode} that the provided {@link PTransformNode} consumes on a |
| * per-element basis. |
| */ |
| public Set<PCollectionNode> getPerElementInputPCollections(PTransformNode ptransform) { |
| return pipelineNetwork.inEdges(ptransform).stream() |
| .filter(PipelineEdge::isPerElement) |
| .map(edge -> (PCollectionNode) pipelineNetwork.incidentNodes(edge).source()) |
| .collect(Collectors.toSet()); |
| } |
| |
| public Set<PCollectionNode> getOutputPCollections(PTransformNode ptransform) { |
| return pipelineNetwork.successors(ptransform).stream() |
| .map(pipelineNode -> (PCollectionNode) pipelineNode) |
| .collect(Collectors.toSet()); |
| } |
| |
| public Components getComponents() { |
| return components; |
| } |
| |
| /** |
| * Returns the {@link SideInputReference SideInputReferences} that the provided transform consumes |
| * as side inputs. |
| */ |
| public Collection<SideInputReference> getSideInputs(PTransformNode transform) { |
| return getLocalSideInputNames(transform.getTransform()).stream() |
| .map( |
| localName -> { |
| String transformId = transform.getId(); |
| PTransform transformProto = components.getTransformsOrThrow(transformId); |
| String collectionId = transform.getTransform().getInputsOrThrow(localName); |
| PCollection collection = components.getPcollectionsOrThrow(collectionId); |
| return SideInputReference.of( |
| PipelineNode.pTransform(transformId, transformProto), |
| localName, |
| PipelineNode.pCollection(collectionId, collection)); |
| }) |
| .collect(Collectors.toSet()); |
| } |
| |
| public Collection<UserStateReference> getUserStates(PTransformNode transform) { |
| return getLocalUserStateNames(transform.getTransform()).stream() |
| .map( |
| localName -> { |
| String transformId = transform.getId(); |
| PTransform transformProto = components.getTransformsOrThrow(transformId); |
| // Get the main input PCollection id. |
| String collectionId = |
| transform |
| .getTransform() |
| .getInputsOrThrow( |
| Iterables.getOnlyElement( |
| Sets.difference( |
| transform.getTransform().getInputsMap().keySet(), |
| ImmutableSet.builder() |
| .addAll(getLocalSideInputNames(transformProto)) |
| .addAll(getLocalTimerNames(transformProto)) |
| .build()))); |
| PCollection collection = components.getPcollectionsOrThrow(collectionId); |
| return UserStateReference.of( |
| PipelineNode.pTransform(transformId, transformProto), |
| localName, |
| PipelineNode.pCollection(collectionId, collection)); |
| }) |
| .collect(Collectors.toSet()); |
| } |
| |
| public Collection<TimerReference> getTimers(PTransformNode transform) { |
| return getLocalTimerNames(transform.getTransform()).stream() |
| .map( |
| localName -> { |
| String transformId = transform.getId(); |
| PTransform transformProto = components.getTransformsOrThrow(transformId); |
| return TimerReference.of( |
| PipelineNode.pTransform(transformId, transformProto), localName); |
| }) |
| .collect(Collectors.toSet()); |
| } |
| |
| private Set<String> getLocalSideInputNames(PTransform transform) { |
| if (PAR_DO_TRANSFORM_URN.equals(transform.getSpec().getUrn())) { |
| try { |
| return ParDoPayload.parseFrom(transform.getSpec().getPayload()).getSideInputsMap().keySet(); |
| } catch (InvalidProtocolBufferException e) { |
| throw new RuntimeException(e); |
| } |
| } else { |
| return Collections.emptySet(); |
| } |
| } |
| |
| private Set<String> getLocalUserStateNames(PTransform transform) { |
| if (PAR_DO_TRANSFORM_URN.equals(transform.getSpec().getUrn())) { |
| try { |
| return ParDoPayload.parseFrom(transform.getSpec().getPayload()).getStateSpecsMap().keySet(); |
| } catch (InvalidProtocolBufferException e) { |
| throw new RuntimeException(e); |
| } |
| } else { |
| return Collections.emptySet(); |
| } |
| } |
| |
| private Set<String> getLocalTimerNames(PTransform transform) { |
| if (PAR_DO_TRANSFORM_URN.equals(transform.getSpec().getUrn())) { |
| try { |
| return ParDoPayload.parseFrom(transform.getSpec().getPayload()).getTimerSpecsMap().keySet(); |
| } catch (InvalidProtocolBufferException e) { |
| throw new RuntimeException(e); |
| } |
| } else { |
| return Collections.emptySet(); |
| } |
| } |
| |
| public Optional<Environment> getEnvironment(PTransformNode parDo) { |
| return Environments.getEnvironment(parDo.getId(), components); |
| } |
| |
| private interface PipelineEdge { |
| boolean isPerElement(); |
| } |
| |
| private static class PerElementEdge implements PipelineEdge { |
| @Override |
| public boolean isPerElement() { |
| return true; |
| } |
| } |
| |
| private static class SingletonEdge implements PipelineEdge { |
| @Override |
| public boolean isPerElement() { |
| return false; |
| } |
| } |
| } |