| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.runners; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.Pipeline.PipelineVisitor; |
| import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior; |
| import org.apache.beam.sdk.annotations.Internal; |
| import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PInput; |
| import org.apache.beam.sdk.values.POutput; |
| import org.apache.beam.sdk.values.PValue; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> |
| * |
| * <p>Captures information about a collection of transformations and their associated {@link |
| * PValue}s. |
| */ |
| @Internal |
| public class TransformHierarchy { |
| private static final Logger LOG = LoggerFactory.getLogger(TransformHierarchy.class); |
| |
| private final Node root; |
| private final Map<Node, PInput> unexpandedInputs; |
| private final Map<POutput, Node> producers; |
| // A map of PValue to the PInput the producing PTransform is applied to |
| private final Map<PValue, PInput> producerInput; |
| // Maintain a stack based on the enclosing nodes |
| private Node current; |
| |
| public TransformHierarchy() { |
| producers = new HashMap<>(); |
| producerInput = new HashMap<>(); |
| unexpandedInputs = new HashMap<>(); |
| root = new Node(); |
| current = root; |
| } |
| |
| /** |
| * Adds the named {@link PTransform} consuming the provided {@link PInput} as a node in this |
| * {@link TransformHierarchy} as a child of the current node, and sets it to be the current node. |
| * |
| * <p>This call should be finished by expanding and recursively calling {@link #pushNode(String, |
| * PInput, PTransform)}, calling {@link #finishSpecifyingInput()}, setting the output with {@link |
| * #setOutput(POutput)}, and ending with a call to {@link #popNode()}. |
| * |
| * @return the added node |
| */ |
| public Node pushNode(String name, PInput input, PTransform<?, ?> transform) { |
| checkNotNull( |
| transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName()); |
| checkNotNull( |
| name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName()); |
| checkNotNull( |
| input, "An input must be provided for all %s Nodes", PTransform.class.getSimpleName()); |
| Node node = new Node(current, transform, name, input); |
| unexpandedInputs.put(node, input); |
| current.addComposite(node); |
| current = node; |
| return current; |
| } |
| |
| @Internal |
| public Node pushFinalizedNode( |
| String name, |
| Map<TupleTag<?>, PValue> inputs, |
| PTransform<?, ?> transform, |
| Map<TupleTag<?>, PValue> outputs) { |
| checkNotNull( |
| transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName()); |
| checkNotNull( |
| name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName()); |
| checkNotNull( |
| inputs, "An input must be provided for all %s Nodes", PTransform.class.getSimpleName()); |
| Node node = new Node(current, transform, name, inputs, outputs); |
| node.finishedSpecifying = true; |
| current.addComposite(node); |
| current = node; |
| return current; |
| } |
| |
| @Internal |
| public Node addFinalizedPrimitiveNode( |
| String name, |
| Map<TupleTag<?>, PValue> inputs, |
| PTransform<?, ?> transform, |
| Map<TupleTag<?>, PValue> outputs) { |
| checkNotNull( |
| transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName()); |
| checkNotNull( |
| name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName()); |
| checkNotNull( |
| inputs, "Inputs must be provided for all %s Nodes", PTransform.class.getSimpleName()); |
| checkNotNull( |
| outputs, "Outputs must be provided for all %s Nodes", PTransform.class.getSimpleName()); |
| Node node = new Node(current, transform, name, inputs, outputs); |
| node.finishedSpecifying = true; |
| for (PValue output : outputs.values()) { |
| producers.put(output, node); |
| } |
| current.addComposite(node); |
| return node; |
| } |
| |
| public Node replaceNode(Node existing, PInput input, PTransform<?, ?> transform) { |
| checkNotNull(existing); |
| checkNotNull(input); |
| checkNotNull(transform); |
| checkState( |
| unexpandedInputs.isEmpty(), |
| "Replacing a node when the graph has an unexpanded input. This is an SDK bug."); |
| Node replacement = |
| new Node(existing.getEnclosingNode(), transform, existing.getFullName(), input); |
| for (PValue output : existing.getOutputs().values()) { |
| Node producer = producers.get(output); |
| boolean producedInExisting = false; |
| do { |
| if (producer.equals(existing)) { |
| producedInExisting = true; |
| } else { |
| producer = producer.getEnclosingNode(); |
| } |
| } while (!producedInExisting && !producer.isRootNode()); |
| if (producedInExisting) { |
| producers.remove(output); |
| LOG.debug( |
| "Removed producer for value {} as it is part of a replaced composite {}", |
| output, |
| existing.getFullName()); |
| } else { |
| LOG.debug("Value {} not produced in existing node {}", output, existing.getFullName()); |
| } |
| } |
| existing.getEnclosingNode().replaceChild(existing, replacement); |
| unexpandedInputs.remove(existing); |
| unexpandedInputs.put(replacement, input); |
| current = replacement; |
| return replacement; |
| } |
| |
| /** |
| * Finish specifying all of the input {@link PValue PValues} of the current {@link Node}. Ensures |
| * that all of the inputs to the current node have been fully specified, and have been produced by |
| * a node in this graph. |
| */ |
| public void finishSpecifyingInput() { |
| // Inputs must be completely specified before they are consumed by a transform. |
| for (PValue inputValue : current.getInputs().values()) { |
| PInput input = producerInput.remove(inputValue); |
| Node producerNode = maybeGetProducer(inputValue); |
| if (producerNode != null) { |
| inputValue.finishSpecifying(input, producerNode.getTransform()); |
| } |
| } |
| } |
| |
| /** |
| * Set the output of the current {@link Node}. If the output is new (setOutput has not previously |
| * been called with it as the parameter), the current node is set as the producer of that {@link |
| * POutput}. |
| * |
| * <p>Also validates the output - specifically, a Primitive {@link PTransform} produces all of its |
| * outputs, and a Composite {@link PTransform} produces none of its outputs. Verifies that the |
| * expanded output does not contain {@link PValue PValues} produced by both this node and other |
| * nodes. |
| */ |
| public void setOutput(POutput output) { |
| for (PCollection<?> value : fullyExpand(output).values()) { |
| if (!producers.containsKey(value)) { |
| producers.put(value, current); |
| value.finishSpecifyingOutput( |
| current.getFullName(), unexpandedInputs.get(current), current.transform); |
| } |
| producerInput.put(value, unexpandedInputs.get(current)); |
| } |
| output.finishSpecifyingOutput( |
| current.getFullName(), unexpandedInputs.get(current), current.transform); |
| current.setOutput(output); |
| } |
| |
| /** |
| * Recursively replace the outputs of the current {@link Node} with the original outputs of the |
| * node it is replacing. No value that is a key in {@code originalToReplacement} may be present |
| * within the {@link TransformHierarchy} after this method completes. |
| */ |
| public void replaceOutputs(Map<PValue, ReplacementOutput> originalToReplacement) { |
| current.replaceOutputs(originalToReplacement); |
| } |
| |
| /** |
| * Pops the current node off the top of the stack, finishing it. Outputs of the node are finished |
| * once they are consumed as input. |
| */ |
| public void popNode() { |
| current.finishSpecifying(); |
| unexpandedInputs.remove(current); |
| current = current.getEnclosingNode(); |
| checkState(current != null, "Can't pop the root node of a TransformHierarchy"); |
| } |
| |
| Node maybeGetProducer(PValue produced) { |
| return producers.get(produced); |
| } |
| |
| Node getProducer(PValue produced) { |
| return checkNotNull(maybeGetProducer(produced), "No producer found for %s", produced); |
| } |
| |
| public Set<PValue> visit(PipelineVisitor visitor) { |
| finishSpecifying(); |
| Set<PValue> visitedValues = new HashSet<>(); |
| root.visit(visitor, visitedValues, new HashSet<>(), new HashSet<>()); |
| return visitedValues; |
| } |
| |
| /** |
| * Finish specifying any remaining nodes within the {@link TransformHierarchy}. These are {@link |
| * PValue PValues} that are produced as output of some {@link PTransform} but are never consumed |
| * as input. These values must still be finished specifying. |
| */ |
| private void finishSpecifying() { |
| for (Entry<PValue, PInput> producerInputEntry : producerInput.entrySet()) { |
| PValue value = producerInputEntry.getKey(); |
| value.finishSpecifying(producerInputEntry.getValue(), getProducer(value).getTransform()); |
| } |
| producerInput.clear(); |
| } |
| |
| public Node getCurrent() { |
| return current; |
| } |
| |
| private Map<TupleTag<?>, PCollection<?>> fullyExpand(POutput output) { |
| Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>(); |
| for (Map.Entry<TupleTag<?>, PValue> value : output.expand().entrySet()) { |
| if (value.getValue() instanceof PCollection) { |
| PCollection<?> previous = result.put(value.getKey(), (PCollection<?>) value.getValue()); |
| checkArgument( |
| previous == null, |
| "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s", |
| output, |
| TupleTag.class.getSimpleName(), |
| value.getKey(), |
| previous, |
| value.getValue()); |
| } else { |
| if (value.getValue().expand().size() == 1 |
| && Iterables.getOnlyElement(value.getValue().expand().values()) |
| .equals(value.getValue())) { |
| throw new IllegalStateException( |
| String.format( |
| "Non %s %s that expands into itself %s", |
| PCollection.class.getSimpleName(), |
| PValue.class.getSimpleName(), |
| value.getValue())); |
| } |
| for (Map.Entry<TupleTag<?>, PCollection<?>> valueComponent : |
| fullyExpand(value.getValue()).entrySet()) { |
| PCollection<?> previous = result.put(valueComponent.getKey(), valueComponent.getValue()); |
| checkArgument( |
| previous == null, |
| "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s", |
| output, |
| TupleTag.class.getSimpleName(), |
| valueComponent.getKey(), |
| previous, |
| valueComponent.getValue()); |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Provides internal tracking of transform relationships with helper methods for initialization |
| * and ordered visitation. |
| */ |
| public class Node { |
| // null for the root node, otherwise the enclosing node |
| @Nullable private final Node enclosingNode; |
| |
| // The PTransform for this node, which may be a composite PTransform. |
| // The root of a TransformHierarchy is represented as a Node |
| // with a null transform field. |
| @Nullable private final PTransform<?, ?> transform; |
| |
| private final String fullName; |
| |
| // Nodes for sub-transforms of a composite transform. |
| private final List<Node> parts = new ArrayList<>(); |
| |
| // Input to the transform, in expanded form. |
| private final Map<TupleTag<?>, PValue> inputs; |
| |
| // TODO: track which outputs need to be exported to parent. |
| // Output of the transform, in expanded form. Null if not yet set. |
| @Nullable private Map<TupleTag<?>, PValue> outputs; |
| |
| @VisibleForTesting boolean finishedSpecifying = false; |
| |
| /** |
| * Creates the root-level node. The root level node has a null enclosing node, a null transform, |
| * an empty map of inputs, an empty map of outputs, and a name equal to the empty string. |
| */ |
| private Node() { |
| this.enclosingNode = null; |
| this.transform = null; |
| this.fullName = ""; |
| this.inputs = Collections.emptyMap(); |
| this.outputs = Collections.emptyMap(); |
| } |
| |
| /** |
| * Creates a new Node with the given parent and transform. |
| * |
| * @param enclosingNode the composite node containing this node |
| * @param transform the PTransform tracked by this node |
| * @param fullName the fully qualified name of the transform |
| * @param input the unexpanded input to the transform |
| */ |
| private Node(Node enclosingNode, PTransform<?, ?> transform, String fullName, PInput input) { |
| this.enclosingNode = enclosingNode; |
| this.transform = transform; |
| this.fullName = fullName; |
| ImmutableMap.Builder<TupleTag<?>, PValue> inputs = ImmutableMap.builder(); |
| inputs.putAll(input.expand()); |
| inputs.putAll(transform.getAdditionalInputs()); |
| this.inputs = inputs.build(); |
| } |
| |
| /** |
| * Creates a new {@link Node} with the given parent and transform, where inputs and outputs are |
| * already known. |
| * |
| * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other |
| * nodes. |
| * |
| * @param enclosingNode the composite node containing this node |
| * @param transform the PTransform tracked by this node |
| * @param fullName the fully qualified name of the transform |
| * @param inputs the expanded inputs to the transform |
| * @param outputs the expanded outputs of the transform |
| */ |
| private Node( |
| @Nullable Node enclosingNode, |
| @Nullable PTransform<?, ?> transform, |
| String fullName, |
| @Nullable Map<TupleTag<?>, PValue> inputs, |
| @Nullable Map<TupleTag<?>, PValue> outputs) { |
| this.enclosingNode = enclosingNode; |
| this.transform = transform; |
| this.fullName = fullName; |
| this.inputs = inputs == null ? Collections.emptyMap() : inputs; |
| this.outputs = outputs == null ? Collections.emptyMap() : outputs; |
| } |
| |
| /** |
| * Returns the transform associated with this transform node. |
| * |
| * @return {@code null} if and only if this is the root node of the graph, which has no |
| * associated transform |
| */ |
| @Nullable |
| public PTransform<?, ?> getTransform() { |
| return transform; |
| } |
| |
| /** Returns the enclosing composite transform node, or null if there is none. */ |
| public Node getEnclosingNode() { |
| return enclosingNode; |
| } |
| |
| /** |
| * Adds a composite operation to the transform node. |
| * |
| * <p>As soon as a node is added, the transform node is considered a composite operation instead |
| * of a primitive transform. |
| */ |
| public void addComposite(Node node) { |
| parts.add(node); |
| } |
| |
| /** |
| * Replace a sub-node of this transform node. |
| * |
| * <p>The existing {@link Node} must be a direct child of this {@link Node}. The replacement |
| * node need not be fully specified. |
| */ |
| public void replaceChild(Node existing, Node replacement) { |
| checkNotNull(existing); |
| checkNotNull(replacement); |
| int existingIndex = parts.indexOf(existing); |
| checkArgument( |
| existingIndex >= 0, |
| "Tried to replace a node %s that doesn't exist as a component of node %s", |
| existing.getFullName(), |
| getFullName()); |
| LOG.debug( |
| "Replaced original node {} with replacement {} at index {}", |
| existing, |
| replacement, |
| existingIndex); |
| parts.set(existingIndex, replacement); |
| } |
| |
| /** |
| * Returns true if this node represents a composite transform that does not perform processing |
| * of its own, but merely encapsulates a sub-pipeline (which may be empty). |
| * |
| * <p>Note that a node may be composite with no sub-transforms if it returns its input directly |
| * extracts a component of a tuple, or other operations that occur at pipeline assembly time. |
| */ |
| public boolean isCompositeNode() { |
| return !parts.isEmpty() || isRootNode() || returnsOthersOutput(); |
| } |
| |
| private boolean returnsOthersOutput() { |
| PTransform<?, ?> transform = getTransform(); |
| if (outputs != null) { |
| for (PValue outputValue : outputs.values()) { |
| if (!getProducer(outputValue).getTransform().equals(transform)) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| public boolean isRootNode() { |
| return transform == null; |
| } |
| |
| public String getFullName() { |
| return fullName; |
| } |
| |
| /** Returns the transform input, in fully expanded form. */ |
| public Map<TupleTag<?>, PValue> getInputs() { |
| return inputs; |
| } |
| |
| /** Adds an output to the transform node. */ |
| private void setOutput(POutput output) { |
| checkState(!finishedSpecifying); |
| checkState( |
| this.outputs == null, "Tried to specify more than one output for %s", getFullName()); |
| checkNotNull(output, "Tried to set the output of %s to null", getFullName()); |
| this.outputs = output.expand(); |
| |
| // Validate that a primitive transform produces only primitive output, and a composite |
| // transform does not produce primitive output. |
| Set<Node> outputProducers = new HashSet<>(); |
| for (PValue outputValue : output.expand().values()) { |
| outputProducers.add(getProducer(outputValue)); |
| } |
| if (outputProducers.contains(this) && (!parts.isEmpty() || outputProducers.size() > 1)) { |
| Set<String> otherProducerNames = new HashSet<>(); |
| for (Node outputProducer : outputProducers) { |
| if (outputProducer != this) { |
| otherProducerNames.add(outputProducer.getFullName()); |
| } |
| } |
| throw new IllegalArgumentException( |
| String.format( |
| "Output of composite transform [%s] contains a primitive %s produced by it. " |
| + "Only primitive transforms are permitted to produce primitive outputs." |
| + "%n Outputs: %s" |
| + "%n Other Producers: %s" |
| + "%n Components: %s", |
| getFullName(), |
| POutput.class.getSimpleName(), |
| output.expand(), |
| otherProducerNames, |
| parts)); |
| } |
| } |
| |
| /** |
| * Replaces each value in {@code originalToReplacement} present in this {@link Node Node's} |
| * outputs with the key that maps to that value. |
| * |
| * <p>Outputs produced by this node that do not have an associated key in this map remain as-is. |
| * Generally, these are nodes which are internal to a composite replacement; they should not |
| * appear as outputs of the replacement composite. |
| * |
| * @param originalToReplacement A map from the outputs of the replacement {@link Node} to the |
| * original output. |
| */ |
| void replaceOutputs(Map<PValue, ReplacementOutput> originalToReplacement) { |
| checkNotNull(this.outputs, "Outputs haven't been specified for node %s yet", getFullName()); |
| for (Node component : this.parts) { |
| // Replace the outputs of the component nodes |
| component.replaceOutputs(originalToReplacement); |
| } |
| ImmutableMap.Builder<TupleTag<?>, PValue> newOutputsBuilder = ImmutableMap.builder(); |
| for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { |
| ReplacementOutput mapping = originalToReplacement.get(output.getValue()); |
| if (mapping != null) { |
| if (this.equals(producers.get(mapping.getReplacement().getValue()))) { |
| // This Node produced the replacement PCollection. The structure of this if statement |
| // requires the replacement transform to produce only new outputs; otherwise the |
| // producers map will not be appropriately updated. TODO: investigate alternatives |
| producerInput.remove(mapping.getReplacement().getValue()); |
| // original and replacement might be identical |
| producers.remove(mapping.getReplacement().getValue()); |
| producers.put(mapping.getOriginal().getValue(), this); |
| } |
| LOG.debug( |
| "Replacing output {} with original {}", |
| mapping.getReplacement(), |
| mapping.getOriginal()); |
| newOutputsBuilder.put(output.getKey(), mapping.getOriginal().getValue()); |
| } else { |
| newOutputsBuilder.put(output); |
| } |
| } |
| ImmutableMap<TupleTag<?>, PValue> newOutputs = newOutputsBuilder.build(); |
| checkState( |
| outputs.size() == newOutputs.size(), |
| "Number of outputs must be stable across replacement"); |
| this.outputs = newOutputs; |
| } |
| |
| /** Returns the transform output, in expanded form. */ |
| public Map<TupleTag<?>, PValue> getOutputs() { |
| return outputs == null ? Collections.emptyMap() : outputs; |
| } |
| |
| /** Returns the {@link AppliedPTransform} representing this {@link Node}. */ |
| public AppliedPTransform<?, ?, ?> toAppliedPTransform(Pipeline pipeline) { |
| return AppliedPTransform.of( |
| getFullName(), inputs, outputs, (PTransform) getTransform(), pipeline); |
| } |
| |
| /** |
| * Visit the transform node. |
| * |
| * <p>The visit proceeds in the following order: |
| * |
| * <ul> |
| * <li>Visit all input {@link PValue PValues} returned by the flattened expansion of {@link |
| * Node#getInputs()}. |
| * <li>If the node is a composite: |
| * <ul> |
| * <li>Enter the node via {@link PipelineVisitor#enterCompositeTransform(Node)}. |
| * <li>If the result of {@link PipelineVisitor#enterCompositeTransform(Node)} was {@link |
| * CompositeBehavior#ENTER_TRANSFORM}, visit each child node of this {@link Node}. |
| * <li>Leave the node via {@link PipelineVisitor#leaveCompositeTransform(Node)}. |
| * </ul> |
| * <li>If the node is a primitive, visit it via {@link |
| * PipelineVisitor#visitPrimitiveTransform(Node)}. |
| * <li>Visit each {@link PValue} that was output by this node. |
| * </ul> |
| * |
| * <p>Additionally, the following ordering restrictions are observed: |
| * |
| * <ul> |
| * <li>A {@link Node} will be visited after its enclosing node has been entered and before its |
| * enclosing node has been left |
| * <li>A {@link Node} will not be visited if any enclosing {@link Node} has returned {@link |
| * CompositeBehavior#DO_NOT_ENTER_TRANSFORM} from the call to {@link |
| * PipelineVisitor#enterCompositeTransform(Node)}. |
| * <li>A {@link PValue} will only be visited after the {@link Node} that originally produced |
| * it has been visited. |
| * </ul> |
| * |
| * <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for |
| * composite transforms), then the output values. |
| */ |
| private void visit( |
| PipelineVisitor visitor, |
| Set<PValue> visitedValues, |
| Set<Node> visitedNodes, |
| Set<Node> skippedComposites) { |
| if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) { |
| // Recursively enter all enclosing nodes, as appropriate. |
| getEnclosingNode().visit(visitor, visitedValues, visitedNodes, skippedComposites); |
| } |
| // These checks occur after visiting the enclosing node to ensure that if this node has been |
| // visited while visiting the enclosing node the node is not revisited, or, if an enclosing |
| // Node is skipped, this node is also skipped. |
| if (!visitedNodes.add(this)) { |
| LOG.debug("Not revisiting previously visited node {}", this); |
| return; |
| } else if (childNodeOf(skippedComposites)) { |
| // This node is a child of a node that has been passed over via CompositeBehavior, and |
| // should also be skipped. All child nodes of a skipped composite should always be skipped. |
| LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this); |
| return; |
| } |
| |
| if (!finishedSpecifying) { |
| finishSpecifying(); |
| } |
| |
| if (!isRootNode()) { |
| // Visit inputs. |
| for (PValue inputValue : inputs.values()) { |
| Node valueProducer = maybeGetProducer(inputValue); |
| if (valueProducer != null) { |
| if (!visitedNodes.contains(valueProducer)) { |
| valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites); |
| } |
| if (visitedValues.add(inputValue)) { |
| LOG.debug("Visiting input value {}", inputValue); |
| visitor.visitValue(inputValue, valueProducer); |
| } |
| } |
| } |
| } |
| |
| if (isCompositeNode()) { |
| LOG.debug("Visiting composite node {}", this); |
| PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this); |
| |
| if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) { |
| for (Node child : parts) { |
| child.visit(visitor, visitedValues, visitedNodes, skippedComposites); |
| } |
| } else { |
| skippedComposites.add(this); |
| } |
| visitor.leaveCompositeTransform(this); |
| } else { |
| LOG.debug("Visiting primitive node {}", this); |
| visitor.visitPrimitiveTransform(this); |
| } |
| |
| if (!isRootNode()) { |
| checkNotNull(outputs, "Outputs for non-root node %s are null", getFullName()); |
| // Visit outputs. |
| for (PValue pValue : outputs.values()) { |
| if (visitedValues.add(pValue)) { |
| LOG.debug("Visiting output value {}", pValue); |
| visitor.visitValue(pValue, this); |
| } |
| } |
| } |
| } |
| |
| private boolean childNodeOf(Set<Node> nodes) { |
| if (isRootNode()) { |
| return false; |
| } |
| Node parent = this.getEnclosingNode(); |
| while (!parent.isRootNode() && !nodes.contains(parent)) { |
| parent = parent.getEnclosingNode(); |
| } |
| return nodes.contains(parent); |
| } |
| |
| /** |
| * Finish specifying a transform. |
| * |
| * <p>All inputs are finished first, then the transform, then all outputs. |
| */ |
| private void finishSpecifying() { |
| if (finishedSpecifying) { |
| return; |
| } |
| finishedSpecifying = true; |
| } |
| |
| @Override |
| public String toString() { |
| if (isRootNode()) { |
| return "RootNode"; |
| } |
| return MoreObjects.toStringHelper(getClass()).add("fullName", fullName).toString(); |
| } |
| } |
| } |