| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.runners; |
| |
| import static org.hamcrest.Matchers.containsInAnyOrder; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.hasItem; |
| import static org.hamcrest.Matchers.is; |
| import static org.hamcrest.Matchers.not; |
| import static org.junit.Assert.assertThat; |
| |
| import java.io.Serializable; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import org.apache.beam.sdk.Pipeline.PipelineVisitor; |
| import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.coders.VarIntCoder; |
| import org.apache.beam.sdk.coders.VarLongCoder; |
| import org.apache.beam.sdk.io.CountingSource; |
| import org.apache.beam.sdk.io.GenerateSequence; |
| import org.apache.beam.sdk.io.Read; |
| import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; |
| import org.apache.beam.sdk.runners.TransformHierarchy.Node; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.ParDo.MultiOutput; |
| import org.apache.beam.sdk.transforms.ParDo.SingleOutput; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollection.IsBounded; |
| import org.apache.beam.sdk.values.PCollectionList; |
| import org.apache.beam.sdk.values.PCollectionTuple; |
| import org.apache.beam.sdk.values.PDone; |
| import org.apache.beam.sdk.values.PInput; |
| import org.apache.beam.sdk.values.POutput; |
| import org.apache.beam.sdk.values.PValue; |
| import org.apache.beam.sdk.values.TaggedPValue; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.TupleTagList; |
| import org.apache.beam.sdk.values.WindowingStrategy; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; |
| import org.hamcrest.Matchers; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for {@link TransformHierarchy}. */ |
| @RunWith(JUnit4.class) |
| public class TransformHierarchyTest implements Serializable { |
| @Rule |
| public final transient TestPipeline pipeline = |
| TestPipeline.create().enableAbandonedNodeEnforcement(false); |
| |
| @Rule public transient ExpectedException thrown = ExpectedException.none(); |
| private transient TransformHierarchy hierarchy; |
| |
| @Before |
| public void setup() { |
| hierarchy = new TransformHierarchy(); |
| } |
| |
| @Test |
| public void getCurrentNoPushReturnsRoot() { |
| assertThat(hierarchy.getCurrent().isRootNode(), is(true)); |
| } |
| |
| @Test |
| public void pushWithoutPushFails() { |
| thrown.expect(IllegalStateException.class); |
| hierarchy.popNode(); |
| } |
| |
| @Test |
| public void pushThenPopSucceeds() { |
| TransformHierarchy.Node root = hierarchy.getCurrent(); |
| TransformHierarchy.Node node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1)); |
| assertThat(hierarchy.getCurrent(), equalTo(node)); |
| hierarchy.popNode(); |
| assertThat(node.finishedSpecifying, is(true)); |
| assertThat(hierarchy.getCurrent(), equalTo(root)); |
| } |
| |
| @Test |
| public void emptyCompositeSucceeds() { |
| PCollection<Long> created = |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of()); |
| TransformHierarchy.Node node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1)); |
| hierarchy.setOutput(created); |
| hierarchy.popNode(); |
| PCollectionList<Long> pcList = PCollectionList.of(created); |
| |
| TransformHierarchy.Node emptyTransform = |
| hierarchy.pushNode( |
| "Extract", |
| pcList, |
| new PTransform<PCollectionList<Long>, PCollection<Long>>() { |
| @Override |
| public PCollection<Long> expand(PCollectionList<Long> input) { |
| return input.get(0); |
| } |
| }); |
| hierarchy.setOutput(created); |
| hierarchy.popNode(); |
| assertThat(hierarchy.getProducer(created), equalTo(node)); |
| assertThat( |
| "A Transform that produces non-primitive output should be composite", |
| emptyTransform.isCompositeNode(), |
| is(true)); |
| } |
| |
| @Test |
| public void producingOwnAndOthersOutputsFails() { |
| PCollection<Long> created = |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of()); |
| hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1)); |
| hierarchy.setOutput(created); |
| hierarchy.popNode(); |
| PCollectionList<Long> pcList = PCollectionList.of(created); |
| |
| final PCollectionList<Long> appended = |
| pcList.and( |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, |
| WindowingStrategy.globalDefault(), |
| IsBounded.BOUNDED, |
| VarLongCoder.of()) |
| .setName("prim")); |
| hierarchy.pushNode( |
| "AddPc", |
| pcList, |
| new PTransform<PCollectionList<Long>, PCollectionList<Long>>() { |
| @Override |
| public PCollectionList<Long> expand(PCollectionList<Long> input) { |
| return appended; |
| } |
| }); |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("contains a primitive POutput produced by it"); |
| thrown.expectMessage("AddPc"); |
| thrown.expectMessage("Create"); |
| thrown.expectMessage(appended.expand().toString()); |
| hierarchy.setOutput(appended); |
| } |
| |
| @Test |
| public void producingOwnOutputWithCompositeFails() { |
| final PCollection<Long> comp = |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of()); |
| PTransform<PBegin, PCollection<Long>> root = |
| new PTransform<PBegin, PCollection<Long>>() { |
| @Override |
| public PCollection<Long> expand(PBegin input) { |
| return comp; |
| } |
| }; |
| hierarchy.pushNode("Composite", PBegin.in(pipeline), root); |
| |
| Create.Values<Integer> create = Create.of(1); |
| hierarchy.pushNode("Create", PBegin.in(pipeline), create); |
| hierarchy.setOutput(pipeline.apply(create)); |
| hierarchy.popNode(); |
| |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("contains a primitive POutput produced by it"); |
| thrown.expectMessage("primitive transforms are permitted to produce"); |
| thrown.expectMessage("Composite"); |
| hierarchy.setOutput(comp); |
| } |
| |
| @Test |
| public void replaceSucceeds() { |
| PTransform<?, ?> enclosingPT = |
| new PTransform<PInput, POutput>() { |
| @Override |
| public POutput expand(PInput input) { |
| return PDone.in(input.getPipeline()); |
| } |
| }; |
| |
| TransformHierarchy.Node enclosing = |
| hierarchy.pushNode("Enclosing", PBegin.in(pipeline), enclosingPT); |
| |
| Create.Values<Long> originalTransform = Create.of(1L); |
| TransformHierarchy.Node original = |
| hierarchy.pushNode("Create", PBegin.in(pipeline), originalTransform); |
| assertThat(hierarchy.getCurrent(), equalTo(original)); |
| PCollection<Long> originalOutput = pipeline.apply(originalTransform); |
| hierarchy.setOutput(originalOutput); |
| hierarchy.popNode(); |
| assertThat(original.finishedSpecifying, is(true)); |
| hierarchy.setOutput(PDone.in(pipeline)); |
| hierarchy.popNode(); |
| |
| assertThat(hierarchy.getCurrent(), not(equalTo(enclosing))); |
| Read.Bounded<Long> replacementTransform = Read.from(CountingSource.upTo(1L)); |
| PCollection<Long> replacementOutput = pipeline.apply(replacementTransform); |
| Node replacement = hierarchy.replaceNode(original, PBegin.in(pipeline), replacementTransform); |
| assertThat(hierarchy.getCurrent(), equalTo(replacement)); |
| hierarchy.setOutput(replacementOutput); |
| |
| TaggedPValue taggedReplacement = TaggedPValue.ofExpandedValue(replacementOutput); |
| Map<PValue, ReplacementOutput> replacementOutputs = |
| Collections.singletonMap( |
| replacementOutput, |
| ReplacementOutput.of(TaggedPValue.ofExpandedValue(originalOutput), taggedReplacement)); |
| hierarchy.replaceOutputs(replacementOutputs); |
| |
| assertThat(replacement.getInputs(), equalTo(original.getInputs())); |
| assertThat(replacement.getEnclosingNode(), equalTo(original.getEnclosingNode())); |
| assertThat(replacement.getEnclosingNode(), equalTo(enclosing)); |
| assertThat(replacement.getTransform(), equalTo(replacementTransform)); |
| // THe tags of the replacement transform are matched to the appropriate PValues of the original |
| assertThat(replacement.getOutputs().keySet(), Matchers.contains(taggedReplacement.getTag())); |
| assertThat(replacement.getOutputs().values(), Matchers.contains(originalOutput)); |
| hierarchy.popNode(); |
| } |
| |
| @Test |
| public void replaceWithCompositeSucceeds() { |
| final SingleOutput<Long, Long> originalParDo = |
| ParDo.of( |
| new DoFn<Long, Long>() { |
| @ProcessElement |
| public void processElement(ProcessContext ctxt) { |
| ctxt.output(ctxt.element() + 1L); |
| } |
| }); |
| |
| GenerateSequence genUpstream = GenerateSequence.from(0); |
| PCollection<Long> upstream = pipeline.apply(genUpstream); |
| PCollection<Long> output = upstream.apply("Original", originalParDo); |
| hierarchy.pushNode("Upstream", pipeline.begin(), genUpstream); |
| hierarchy.finishSpecifyingInput(); |
| hierarchy.setOutput(upstream); |
| hierarchy.popNode(); |
| |
| TransformHierarchy.Node original = hierarchy.pushNode("Original", upstream, originalParDo); |
| hierarchy.finishSpecifyingInput(); |
| hierarchy.setOutput(output); |
| hierarchy.popNode(); |
| |
| final TupleTag<Long> longs = new TupleTag<>(); |
| final MultiOutput<Long, Long> replacementParDo = |
| ParDo.of( |
| new DoFn<Long, Long>() { |
| @ProcessElement |
| public void processElement(ProcessContext ctxt) { |
| ctxt.output(ctxt.element() + 1L); |
| } |
| }) |
| .withOutputTags(longs, TupleTagList.empty()); |
| PTransform<PCollection<Long>, PCollection<Long>> replacementComposite = |
| new PTransform<PCollection<Long>, PCollection<Long>>() { |
| @Override |
| public PCollection<Long> expand(PCollection<Long> input) { |
| return input.apply("Contained", replacementParDo).get(longs); |
| } |
| }; |
| |
| PCollectionTuple replacementOutput = upstream.apply("Contained", replacementParDo); |
| |
| Node compositeNode = hierarchy.replaceNode(original, upstream, replacementComposite); |
| Node replacementParNode = hierarchy.pushNode("Original/Contained", upstream, replacementParDo); |
| hierarchy.finishSpecifyingInput(); |
| hierarchy.setOutput(replacementOutput); |
| hierarchy.popNode(); |
| hierarchy.setOutput(replacementOutput.get(longs)); |
| |
| Entry<TupleTag<?>, PValue> replacementLongs = |
| Iterables.getOnlyElement(replacementOutput.expand().entrySet()); |
| hierarchy.replaceOutputs( |
| Collections.singletonMap( |
| replacementOutput.get(longs), |
| ReplacementOutput.of( |
| TaggedPValue.ofExpandedValue(output), |
| TaggedPValue.of(replacementLongs.getKey(), replacementLongs.getValue())))); |
| |
| assertThat( |
| replacementParNode.getOutputs().keySet(), Matchers.contains(replacementLongs.getKey())); |
| assertThat(replacementParNode.getOutputs().values(), Matchers.contains(output)); |
| assertThat( |
| compositeNode.getOutputs().keySet(), |
| equalTo(replacementOutput.get(longs).expand().keySet())); |
| assertThat(compositeNode.getOutputs().values(), Matchers.contains(output)); |
| hierarchy.popNode(); |
| } |
| |
| @Test |
| public void visitVisitsAllPushed() { |
| TransformHierarchy.Node root = hierarchy.getCurrent(); |
| PBegin begin = PBegin.in(pipeline); |
| |
| Create.Values<Long> create = Create.of(1L); |
| Read.Bounded<Long> read = Read.from(CountingSource.upTo(1L)); |
| |
| PCollection<Long> created = |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of()); |
| |
| SingleOutput<Long, Long> pardo = |
| ParDo.of( |
| new DoFn<Long, Long>() { |
| @ProcessElement |
| public void processElement(ProcessContext ctxt) { |
| ctxt.output(ctxt.element()); |
| } |
| }); |
| |
| PCollection<Long> mapped = |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of()); |
| |
| TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create); |
| hierarchy.finishSpecifyingInput(); |
| assertThat(hierarchy.getCurrent(), equalTo(compositeNode)); |
| assertThat(compositeNode.getInputs().entrySet(), Matchers.empty()); |
| assertThat(compositeNode.getTransform(), equalTo(create)); |
| // Not yet set |
| assertThat(compositeNode.getOutputs().entrySet(), Matchers.emptyIterable()); |
| assertThat(compositeNode.getEnclosingNode().isRootNode(), is(true)); |
| |
| TransformHierarchy.Node primitiveNode = hierarchy.pushNode("Create/Read", begin, read); |
| assertThat(hierarchy.getCurrent(), equalTo(primitiveNode)); |
| hierarchy.finishSpecifyingInput(); |
| hierarchy.setOutput(created); |
| hierarchy.popNode(); |
| assertThat(primitiveNode.getOutputs().values(), containsInAnyOrder(created)); |
| assertThat(primitiveNode.getInputs().entrySet(), Matchers.emptyIterable()); |
| assertThat(primitiveNode.getTransform(), equalTo(read)); |
| assertThat(primitiveNode.getEnclosingNode(), equalTo(compositeNode)); |
| |
| hierarchy.setOutput(created); |
| // The composite is listed as outputting a PValue created by the contained primitive |
| assertThat(compositeNode.getOutputs().values(), containsInAnyOrder(created)); |
| // The producer of that PValue is still the primitive in which it is first output |
| assertThat(hierarchy.getProducer(created), equalTo(primitiveNode)); |
| hierarchy.popNode(); |
| |
| TransformHierarchy.Node otherPrimitive = hierarchy.pushNode("ParDo", created, pardo); |
| hierarchy.finishSpecifyingInput(); |
| hierarchy.setOutput(mapped); |
| hierarchy.popNode(); |
| |
| final Set<TransformHierarchy.Node> visitedCompositeNodes = new HashSet<>(); |
| final Set<TransformHierarchy.Node> visitedPrimitiveNodes = new HashSet<>(); |
| final Set<PValue> visitedValuesInVisitor = new HashSet<>(); |
| |
| Set<PValue> visitedValues = |
| hierarchy.visit( |
| new PipelineVisitor.Defaults() { |
| @Override |
| public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { |
| visitedCompositeNodes.add(node); |
| return CompositeBehavior.ENTER_TRANSFORM; |
| } |
| |
| @Override |
| public void visitPrimitiveTransform(TransformHierarchy.Node node) { |
| visitedPrimitiveNodes.add(node); |
| } |
| |
| @Override |
| public void visitValue(PValue value, TransformHierarchy.Node producer) { |
| visitedValuesInVisitor.add(value); |
| } |
| }); |
| |
| assertThat(visitedCompositeNodes, containsInAnyOrder(root, compositeNode)); |
| assertThat(visitedPrimitiveNodes, containsInAnyOrder(primitiveNode, otherPrimitive)); |
| assertThat(visitedValuesInVisitor, containsInAnyOrder(created, mapped)); |
| assertThat(visitedValuesInVisitor, equalTo(visitedValues)); |
| } |
| |
| /** |
| * Tests that visiting the {@link TransformHierarchy} after replacing nodes does not visit any of |
| * the original nodes or inaccessible values but does visit all of the replacement nodes, new |
| * inaccessible replacement values, and the original output values. |
| */ |
| @Test |
| public void visitAfterReplace() { |
| Node root = hierarchy.getCurrent(); |
| final SingleOutput<Long, Long> originalParDo = |
| ParDo.of( |
| new DoFn<Long, Long>() { |
| @ProcessElement |
| public void processElement(ProcessContext ctxt) { |
| ctxt.output(ctxt.element() + 1L); |
| } |
| }); |
| |
| GenerateSequence genUpstream = GenerateSequence.from(0); |
| PCollection<Long> upstream = pipeline.apply(genUpstream); |
| PCollection<Long> output = upstream.apply("Original", originalParDo); |
| Node upstreamNode = hierarchy.pushNode("Upstream", pipeline.begin(), genUpstream); |
| hierarchy.finishSpecifyingInput(); |
| hierarchy.setOutput(upstream); |
| hierarchy.popNode(); |
| |
| Node original = hierarchy.pushNode("Original", upstream, originalParDo); |
| hierarchy.finishSpecifyingInput(); |
| hierarchy.setOutput(output); |
| hierarchy.popNode(); |
| |
| final TupleTag<Long> longs = new TupleTag<>(); |
| final MultiOutput<Long, Long> replacementParDo = |
| ParDo.of( |
| new DoFn<Long, Long>() { |
| @ProcessElement |
| public void processElement(ProcessContext ctxt) { |
| ctxt.output(ctxt.element() + 1L); |
| } |
| }) |
| .withOutputTags(longs, TupleTagList.empty()); |
| PTransform<PCollection<Long>, PCollection<Long>> replacementComposite = |
| new PTransform<PCollection<Long>, PCollection<Long>>() { |
| @Override |
| public PCollection<Long> expand(PCollection<Long> input) { |
| return input.apply("Contained", replacementParDo).get(longs); |
| } |
| }; |
| |
| PCollectionTuple replacementOutput = upstream.apply("Contained", replacementParDo); |
| |
| Node compositeNode = hierarchy.replaceNode(original, upstream, replacementComposite); |
| Node replacementParNode = hierarchy.pushNode("Original/Contained", upstream, replacementParDo); |
| hierarchy.finishSpecifyingInput(); |
| hierarchy.setOutput(replacementOutput); |
| hierarchy.popNode(); |
| hierarchy.setOutput(replacementOutput.get(longs)); |
| |
| Entry<TupleTag<?>, PValue> replacementLongs = |
| Iterables.getOnlyElement(replacementOutput.expand().entrySet()); |
| hierarchy.replaceOutputs( |
| Collections.singletonMap( |
| replacementOutput.get(longs), |
| ReplacementOutput.of( |
| TaggedPValue.ofExpandedValue(output), |
| TaggedPValue.of(replacementLongs.getKey(), replacementLongs.getValue())))); |
| hierarchy.popNode(); |
| |
| final Set<Node> visitedCompositeNodes = new HashSet<>(); |
| final Set<Node> visitedPrimitiveNodes = new HashSet<>(); |
| Set<PValue> visitedValues = |
| hierarchy.visit( |
| new Defaults() { |
| @Override |
| public CompositeBehavior enterCompositeTransform(Node node) { |
| visitedCompositeNodes.add(node); |
| return CompositeBehavior.ENTER_TRANSFORM; |
| } |
| |
| @Override |
| public void visitPrimitiveTransform(Node node) { |
| visitedPrimitiveNodes.add(node); |
| } |
| }); |
| |
| /* |
| Final Graph: |
| Upstream -> Upstream.out -> Composite -> (ReplacementParDo -> OriginalParDo.out) |
| */ |
| assertThat(visitedCompositeNodes, containsInAnyOrder(root, compositeNode)); |
| assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode)); |
| assertThat(visitedValues, containsInAnyOrder(upstream, output)); |
| } |
| |
| @Test |
| public void visitIsTopologicallyOrdered() { |
| PCollection<String> one = |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of()); |
| final PCollection<Integer> two = |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of()); |
| final PDone done = PDone.in(pipeline); |
| final TupleTag<String> oneTag = new TupleTag<String>() {}; |
| final TupleTag<Integer> twoTag = new TupleTag<Integer>() {}; |
| final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two); |
| |
| PTransform<PCollection<String>, PDone> multiConsumer = |
| new PTransform<PCollection<String>, PDone>() { |
| @Override |
| public PDone expand(PCollection<String> input) { |
| return done; |
| } |
| |
| @Override |
| public Map<TupleTag<?>, PValue> getAdditionalInputs() { |
| return Collections.singletonMap(twoTag, two); |
| } |
| }; |
| hierarchy.pushNode("consumes_both", one, multiConsumer); |
| hierarchy.setOutput(done); |
| hierarchy.popNode(); |
| |
| final PTransform<PBegin, PCollectionTuple> producer = |
| new PTransform<PBegin, PCollectionTuple>() { |
| @Override |
| public PCollectionTuple expand(PBegin input) { |
| return oneAndTwo; |
| } |
| }; |
| hierarchy.pushNode( |
| "encloses_producer", |
| PBegin.in(pipeline), |
| new PTransform<PBegin, PCollectionTuple>() { |
| @Override |
| public PCollectionTuple expand(PBegin input) { |
| return input.apply(producer); |
| } |
| }); |
| hierarchy.pushNode("creates_one_and_two", PBegin.in(pipeline), producer); |
| hierarchy.setOutput(oneAndTwo); |
| hierarchy.popNode(); |
| hierarchy.setOutput(oneAndTwo); |
| hierarchy.popNode(); |
| |
| hierarchy.pushNode("second_copy_of_consumes_both", one, multiConsumer); |
| hierarchy.setOutput(done); |
| hierarchy.popNode(); |
| |
| final Set<Node> visitedNodes = new HashSet<>(); |
| final Set<Node> exitedNodes = new HashSet<>(); |
| final Set<PValue> visitedValues = new HashSet<>(); |
| hierarchy.visit( |
| new PipelineVisitor.Defaults() { |
| |
| @Override |
| public CompositeBehavior enterCompositeTransform(Node node) { |
| for (PValue input : node.getInputs().values()) { |
| assertThat(visitedValues, hasItem(input)); |
| } |
| assertThat( |
| "Nodes should not be visited more than once", visitedNodes, not(hasItem(node))); |
| if (!node.isRootNode()) { |
| assertThat( |
| "Nodes should always be visited after their enclosing nodes", |
| visitedNodes, |
| hasItem(node.getEnclosingNode())); |
| } |
| visitedNodes.add(node); |
| return CompositeBehavior.ENTER_TRANSFORM; |
| } |
| |
| @Override |
| public void leaveCompositeTransform(Node node) { |
| assertThat(visitedNodes, hasItem(node)); |
| if (!node.isRootNode()) { |
| assertThat( |
| "Nodes should always be left before their enclosing nodes are left", |
| exitedNodes, |
| not(hasItem(node.getEnclosingNode()))); |
| } |
| assertThat(exitedNodes, not(hasItem(node))); |
| exitedNodes.add(node); |
| } |
| |
| @Override |
| public void visitPrimitiveTransform(Node node) { |
| assertThat(visitedNodes, hasItem(node.getEnclosingNode())); |
| assertThat(exitedNodes, not(hasItem(node.getEnclosingNode()))); |
| assertThat( |
| "Nodes should not be visited more than once", visitedNodes, not(hasItem(node))); |
| for (PValue input : node.getInputs().values()) { |
| assertThat(visitedValues, hasItem(input)); |
| } |
| visitedNodes.add(node); |
| } |
| |
| @Override |
| public void visitValue(PValue value, Node producer) { |
| assertThat(visitedNodes, hasItem(producer)); |
| assertThat(visitedValues, not(hasItem(value))); |
| visitedValues.add(value); |
| } |
| }); |
| assertThat("Should have visited all the nodes", visitedNodes.size(), equalTo(5)); |
| assertThat("Should have left all of the visited composites", exitedNodes.size(), equalTo(2)); |
| } |
| |
| @Test |
| public void visitDoesNotVisitSkippedNodes() { |
| PCollection<String> one = |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of()); |
| final PCollection<Integer> two = |
| PCollection.createPrimitiveOutputInternal( |
| pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of()); |
| final PDone done = PDone.in(pipeline); |
| final TupleTag<String> oneTag = new TupleTag<String>() {}; |
| final TupleTag<Integer> twoTag = new TupleTag<Integer>() {}; |
| final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two); |
| |
| hierarchy.pushNode( |
| "consumes_both", |
| one, |
| new PTransform<PCollection<String>, PDone>() { |
| @Override |
| public PDone expand(PCollection<String> input) { |
| return done; |
| } |
| |
| @Override |
| public Map<TupleTag<?>, PValue> getAdditionalInputs() { |
| return Collections.singletonMap(twoTag, two); |
| } |
| }); |
| hierarchy.setOutput(done); |
| hierarchy.popNode(); |
| |
| final PTransform<PBegin, PCollectionTuple> producer = |
| new PTransform<PBegin, PCollectionTuple>() { |
| @Override |
| public PCollectionTuple expand(PBegin input) { |
| return oneAndTwo; |
| } |
| }; |
| final Node enclosing = |
| hierarchy.pushNode( |
| "encloses_producer", |
| PBegin.in(pipeline), |
| new PTransform<PBegin, PCollectionTuple>() { |
| @Override |
| public PCollectionTuple expand(PBegin input) { |
| return input.apply(producer); |
| } |
| }); |
| Node enclosed = hierarchy.pushNode("creates_one_and_two", PBegin.in(pipeline), producer); |
| hierarchy.setOutput(oneAndTwo); |
| hierarchy.popNode(); |
| hierarchy.setOutput(oneAndTwo); |
| hierarchy.popNode(); |
| |
| final Set<Node> visitedNodes = new HashSet<>(); |
| hierarchy.visit( |
| new PipelineVisitor.Defaults() { |
| @Override |
| public CompositeBehavior enterCompositeTransform(Node node) { |
| visitedNodes.add(node); |
| return node.equals(enclosing) |
| ? CompositeBehavior.DO_NOT_ENTER_TRANSFORM |
| : CompositeBehavior.ENTER_TRANSFORM; |
| } |
| |
| @Override |
| public void visitPrimitiveTransform(Node node) { |
| visitedNodes.add(node); |
| } |
| }); |
| |
| assertThat(visitedNodes, hasItem(enclosing)); |
| assertThat(visitedNodes, not(hasItem(enclosed))); |
| } |
| } |