| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.core.construction.graph; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables.getOnlyElement; |
| import static org.hamcrest.Matchers.anyOf; |
| import static org.hamcrest.Matchers.contains; |
| import static org.hamcrest.Matchers.containsInAnyOrder; |
| import static org.hamcrest.Matchers.emptyIterable; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.hasItems; |
| import static org.hamcrest.Matchers.hasSize; |
| import static org.hamcrest.Matchers.not; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.Assert.assertThat; |
| |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import org.apache.beam.model.pipeline.v1.RunnerApi; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Coder; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Components; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy; |
| import org.apache.beam.runners.core.construction.Environments; |
| import org.apache.beam.runners.core.construction.PTransformTranslation; |
| import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; |
| import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; |
| import org.hamcrest.Matchers; |
| import org.hamcrest.core.AnyOf; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for {@link GreedyPipelineFuser}. */ |
| @RunWith(JUnit4.class) |
| public class GreedyPipelineFuserTest { |
| // Contains the 'go' and 'py' environments, and a default 'impulse' step and output. |
| private Components partialComponents; |
| |
| @Before |
| public void setup() { |
| partialComponents = |
| Components.newBuilder() |
| .putTransforms( |
| "impulse", |
| PTransform.newBuilder() |
| .setUniqueName("Impulse") |
| .putOutputs("output", "impulse.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("impulse.out", pc("impulse.out")) |
| .putEnvironments("go", Environments.createDockerEnvironment("go")) |
| .putEnvironments("py", Environments.createDockerEnvironment("py")) |
| .putCoders("coder", Coder.newBuilder().build()) |
| .putCoders("windowCoder", Coder.newBuilder().build()) |
| .putWindowingStrategies( |
| "ws", WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()) |
| .build(); |
| } |
| |
| private static PCollection pc(String name) { |
| return PCollection.newBuilder() |
| .setUniqueName(name) |
| .setCoderId("coder") |
| .setWindowingStrategyId("ws") |
| .build(); |
| } |
| |
| /* |
| * impulse -> .out -> read -> .out -> parDo -> .out -> window -> .out |
| * becomes |
| * (impulse.out) -> read -> read.out -> parDo -> parDo.out -> window |
| */ |
| @Test |
| public void singleEnvironmentBecomesASingleStage() { |
| String name = "read.out"; |
| Components components = |
| partialComponents |
| .toBuilder() |
| .putTransforms( |
| "read", |
| PTransform.newBuilder() |
| .setUniqueName("Read") |
| .putInputs("input", "impulse.out") |
| .putOutputs("output", "read.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("read.out", pc(name)) |
| .putTransforms( |
| "parDo", |
| PTransform.newBuilder() |
| .setUniqueName("ParDo") |
| .putInputs("input", "read.out") |
| .putOutputs("output", "parDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("parDo.out", pc("parDo.out")) |
| .putTransforms( |
| "window", |
| PTransform.newBuilder() |
| .setUniqueName("Window") |
| .putInputs("input", "parDo.out") |
| .putOutputs("output", "window.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN) |
| .setPayload( |
| WindowIntoPayload.newBuilder() |
| .setWindowFn( |
| SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("window.out", pc("window.out")) |
| .build(); |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat( |
| fused.getRunnerExecutedTransforms(), |
| contains(PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse")))); |
| assertThat( |
| fused.getFusedStages(), |
| contains( |
| ExecutableStageMatcher.withInput("impulse.out") |
| .withNoOutputs() |
| .withTransforms("read", "parDo", "window"))); |
| } |
| |
| /* |
| * impulse -> .out -> mystery -> .out |
| * \ |
| * -> enigma -> .out |
| * becomes all runner-executed |
| */ |
| @Test |
| public void transformsWithNoEnvironmentBecomeRunnerExecuted() { |
| Components components = |
| partialComponents |
| .toBuilder() |
| .putTransforms( |
| "mystery", |
| PTransform.newBuilder() |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)) |
| .setUniqueName("Mystery") |
| .putInputs("input", "impulse.out") |
| .putOutputs("output", "mystery.out") |
| .build()) |
| .putPcollections("mystery.out", pc("mystery.out")) |
| .putTransforms( |
| "enigma", |
| PTransform.newBuilder() |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)) |
| .setUniqueName("Enigma") |
| .putInputs("input", "impulse.out") |
| .putOutputs("output", "enigma.out") |
| .build()) |
| .putPcollections("enigma.out", pc("enigma.out")) |
| .build(); |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat( |
| fused.getRunnerExecutedTransforms(), |
| containsInAnyOrder( |
| PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse")), |
| PipelineNode.pTransform("mystery", components.getTransformsOrThrow("mystery")), |
| PipelineNode.pTransform("enigma", components.getTransformsOrThrow("enigma")))); |
| assertThat(fused.getFusedStages(), emptyIterable()); |
| } |
| |
| /* |
| * impulse -> .out -> read -> .out -> groupByKey -> .out -> parDo -> .out |
| * becomes |
| * (impulse.out) -> read -> (read.out) |
| * (groupByKey.out) -> parDo |
| */ |
| @Test |
| public void singleEnvironmentAcrossGroupByKeyMultipleStages() { |
| Components components = |
| partialComponents |
| .toBuilder() |
| .putTransforms( |
| "read", |
| PTransform.newBuilder() |
| .setUniqueName("Read") |
| .putInputs("input", "impulse.out") |
| .putOutputs("output", "read.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("read.out", pc("read.out")) |
| .putTransforms( |
| "groupByKey", |
| PTransform.newBuilder() |
| .setUniqueName("GroupByKey") |
| .putInputs("input", "read.out") |
| .putOutputs("output", "groupByKey.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("groupByKey.out", pc("groupByKey.out")) |
| .putTransforms( |
| "parDo", |
| PTransform.newBuilder() |
| .setUniqueName("ParDo") |
| .putInputs("input", "groupByKey.out") |
| .putOutputs("output", "parDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("parDo.out", pc("parDo.out")) |
| .build(); |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat( |
| fused.getRunnerExecutedTransforms(), |
| containsInAnyOrder( |
| PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse")), |
| PipelineNode.pTransform("groupByKey", components.getTransformsOrThrow("groupByKey")))); |
| assertThat( |
| fused.getFusedStages(), |
| containsInAnyOrder( |
| ExecutableStageMatcher.withInput("impulse.out") |
| .withOutputs("read.out") |
| .withTransforms("read"), |
| ExecutableStageMatcher.withInput("groupByKey.out") |
| .withNoOutputs() |
| .withTransforms("parDo"))); |
| } |
| |
| /* |
| * impulse -> .out -> read -> .out --> goTransform -> .out |
| * \ |
| * -> pyTransform -> .out |
| * becomes (impulse.out) -> read -> (read.out) |
| * (read.out) -> goTransform |
| * (read.out) -> pyTransform |
| */ |
| @Test |
| public void multipleEnvironmentsBecomesMultipleStages() { |
| Components components = |
| partialComponents |
| .toBuilder() |
| .putTransforms( |
| "read", |
| PTransform.newBuilder() |
| .setUniqueName("Read") |
| .putInputs("input", "impulse.out") |
| .putOutputs("output", "read.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("read.out", pc("read.out")) |
| .putTransforms( |
| "goTransform", |
| PTransform.newBuilder() |
| .setUniqueName("GoTransform") |
| .putInputs("input", "read.out") |
| .putOutputs("output", "go.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("go")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("go.out", pc("go.out")) |
| .putTransforms( |
| "pyTransform", |
| PTransform.newBuilder() |
| .setUniqueName("PyTransform") |
| .putInputs("input", "read.out") |
| .putOutputs("output", "py.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN) |
| .setPayload( |
| WindowIntoPayload.newBuilder() |
| .setWindowFn( |
| SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("py.out", pc("py.out")) |
| .build(); |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| // Impulse is the runner transform |
| assertThat(fused.getRunnerExecutedTransforms(), hasSize(1)); |
| assertThat(fused.getFusedStages(), hasSize(3)); |
| assertThat( |
| fused.getFusedStages(), |
| containsInAnyOrder( |
| ExecutableStageMatcher.withInput("impulse.out") |
| .withOutputs("read.out") |
| .withTransforms("read"), |
| ExecutableStageMatcher.withInput("read.out") |
| .withNoOutputs() |
| .withTransforms("pyTransform"), |
| ExecutableStageMatcher.withInput("read.out") |
| .withNoOutputs() |
| .withTransforms("goTransform"))); |
| } |
| |
| /* |
| * goImpulse -> .out -> goRead -> .out \ -> goParDo -> .out |
| * -> flatten -> .out | |
| * pyImpulse -> .out -> pyRead -> .out / -> pyParDo -> .out |
| * |
| * becomes |
| * (goImpulse.out) -> goRead -> goRead.out -> flatten -> (flatten.out_synthetic0) |
| * (pyImpulse.out) -> pyRead -> pyRead.out -> flatten -> (flatten.out_synthetic1) |
| * flatten.out_synthetic0 & flatten.out_synthetic1 -> synthetic_flatten -> flatten.out |
| * (flatten.out) -> goParDo |
| * (flatten.out) -> pyParDo |
| */ |
| @Test |
| public void flattenWithHeterogenousInputsAndOutputsEntirelyMaterialized() { |
| Components components = |
| Components.newBuilder() |
| .putCoders("coder", Coder.newBuilder().build()) |
| .putCoders("windowCoder", Coder.newBuilder().build()) |
| .putWindowingStrategies( |
| "ws", WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()) |
| .putTransforms( |
| "pyImpulse", |
| PTransform.newBuilder() |
| .setUniqueName("PyImpulse") |
| .putOutputs("output", "pyImpulse.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("pyImpulse.out", pc("pyImpulse.out")) |
| .putTransforms( |
| "pyRead", |
| PTransform.newBuilder() |
| .setUniqueName("PyRead") |
| .putInputs("input", "pyImpulse.out") |
| .putOutputs("output", "pyRead.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("pyRead.out", pc("pyRead.out")) |
| .putTransforms( |
| "goImpulse", |
| PTransform.newBuilder() |
| .setUniqueName("GoImpulse") |
| .putOutputs("output", "goImpulse.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("goImpulse.out", pc("goImpulse.out")) |
| .putTransforms( |
| "goRead", |
| PTransform.newBuilder() |
| .setUniqueName("GoRead") |
| .putInputs("input", "goImpulse.out") |
| .putOutputs("output", "goRead.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("go")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("goRead.out", pc("goRead.out")) |
| .putTransforms( |
| "flatten", |
| PTransform.newBuilder() |
| .setUniqueName("Flatten") |
| .putInputs("goReadInput", "goRead.out") |
| .putInputs("pyReadInput", "pyRead.out") |
| .putOutputs("output", "flatten.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("flatten.out", pc("flatten.out")) |
| .putTransforms( |
| "pyParDo", |
| PTransform.newBuilder() |
| .setUniqueName("PyParDo") |
| .putInputs("input", "flatten.out") |
| .putOutputs("output", "pyParDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("pyParDo.out", pc("pyParDo.out")) |
| .putTransforms( |
| "goParDo", |
| PTransform.newBuilder() |
| .setUniqueName("GoParDo") |
| .putInputs("input", "flatten.out") |
| .putOutputs("output", "goParDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("go")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("goParDo.out", pc("goParDo.out")) |
| .putEnvironments("go", Environments.createDockerEnvironment("go")) |
| .putEnvironments("py", Environments.createDockerEnvironment("py")) |
| .build(); |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat(fused.getRunnerExecutedTransforms(), hasSize(3)); |
| assertThat( |
| "The runner should include the impulses for both languages, plus an introduced flatten", |
| fused.getRunnerExecutedTransforms(), |
| hasItems( |
| PipelineNode.pTransform("pyImpulse", components.getTransformsOrThrow("pyImpulse")), |
| PipelineNode.pTransform("goImpulse", components.getTransformsOrThrow("goImpulse")))); |
| |
| PTransformNode flattenNode = null; |
| for (PTransformNode runnerTransform : fused.getRunnerExecutedTransforms()) { |
| if (getOnlyElement(runnerTransform.getTransform().getOutputsMap().values()) |
| .equals("flatten.out")) { |
| flattenNode = runnerTransform; |
| } |
| } |
| |
| assertThat(flattenNode, not(nullValue())); |
| assertThat( |
| flattenNode.getTransform().getSpec().getUrn(), |
| equalTo(PTransformTranslation.FLATTEN_TRANSFORM_URN)); |
| assertThat(new HashSet<>(flattenNode.getTransform().getInputsMap().values()), hasSize(2)); |
| |
| Collection<String> introducedOutputs = flattenNode.getTransform().getInputsMap().values(); |
| AnyOf<String> anyIntroducedPCollection = |
| anyOf(introducedOutputs.stream().map(Matchers::equalTo).collect(Collectors.toSet())); |
| assertThat( |
| fused.getFusedStages(), |
| containsInAnyOrder( |
| ExecutableStageMatcher.withInput("goImpulse.out") |
| .withOutputs(anyIntroducedPCollection) |
| .withTransforms("goRead", "flatten"), |
| ExecutableStageMatcher.withInput("pyImpulse.out") |
| .withOutputs(anyIntroducedPCollection) |
| .withTransforms("pyRead", "flatten"), |
| ExecutableStageMatcher.withInput("flatten.out") |
| .withNoOutputs() |
| .withTransforms("goParDo"), |
| ExecutableStageMatcher.withInput("flatten.out") |
| .withNoOutputs() |
| .withTransforms("pyParDo"))); |
| Set<String> materializedStageOutputs = |
| fused.getFusedStages().stream() |
| .flatMap(executableStage -> executableStage.getOutputPCollections().stream()) |
| .map(PCollectionNode::getId) |
| .collect(Collectors.toSet()); |
| |
| assertThat( |
| "All materialized stage outputs should be flattened, and no more", |
| materializedStageOutputs, |
| containsInAnyOrder( |
| flattenNode.getTransform().getInputsMap().values().toArray(new String[0]))); |
| } |
| |
| /* |
| * impulseA -> .out -> goRead -> .out \ |
| * -> flatten -> .out -> goParDo -> .out |
| * impulseB -> .out -> pyRead -> .out / |
| * |
| * becomes |
| * (impulseA.out) -> goRead -> goRead.out -> flatten -> flatten.out -> goParDo |
| * (impulseB.out) -> pyRead -> pyRead.out -> flatten -> (flatten.out) |
| * (flatten.out) -> goParDo |
| */ |
| @Test |
| public void flattenWithHeterogeneousInputsSingleEnvOutputPartiallyMaterialized() { |
| Components components = |
| Components.newBuilder() |
| .putCoders("coder", Coder.newBuilder().build()) |
| .putCoders("windowCoder", Coder.newBuilder().build()) |
| .putWindowingStrategies( |
| "ws", WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()) |
| .putTransforms( |
| "pyImpulse", |
| PTransform.newBuilder() |
| .setUniqueName("PyImpulse") |
| .putOutputs("output", "pyImpulse.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("pyImpulse.out", pc("pyImpulse.out")) |
| .putTransforms( |
| "pyRead", |
| PTransform.newBuilder() |
| .setUniqueName("PyRead") |
| .putInputs("input", "pyImpulse.out") |
| .putOutputs("output", "pyRead.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("pyRead.out", pc("pyRead.out")) |
| .putTransforms( |
| "goImpulse", |
| PTransform.newBuilder() |
| .setUniqueName("GoImpulse") |
| .putOutputs("output", "goImpulse.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("goImpulse.out", pc("goImpulse.out")) |
| .putTransforms( |
| "goRead", |
| PTransform.newBuilder() |
| .setUniqueName("GoRead") |
| .putInputs("input", "goImpulse.out") |
| .putOutputs("output", "goRead.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("go")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("goRead.out", pc("goRead.out")) |
| .putTransforms( |
| "flatten", |
| PTransform.newBuilder() |
| .setUniqueName("Flatten") |
| .putInputs("goReadInput", "goRead.out") |
| .putInputs("pyReadInput", "pyRead.out") |
| .putOutputs("output", "flatten.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("flatten.out", pc("flatten.out")) |
| .putTransforms( |
| "goParDo", |
| PTransform.newBuilder() |
| .setUniqueName("GoParDo") |
| .putInputs("input", "flatten.out") |
| .putOutputs("output", "goParDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("go")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("goParDo.out", pc("goParDo.out")) |
| .putEnvironments("go", Environments.createDockerEnvironment("go")) |
| .putEnvironments("py", Environments.createDockerEnvironment("py")) |
| .build(); |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat( |
| fused.getRunnerExecutedTransforms(), |
| containsInAnyOrder( |
| PipelineNode.pTransform("pyImpulse", components.getTransformsOrThrow("pyImpulse")), |
| PipelineNode.pTransform("goImpulse", components.getTransformsOrThrow("goImpulse")))); |
| |
| assertThat( |
| fused.getFusedStages(), |
| containsInAnyOrder( |
| ExecutableStageMatcher.withInput("goImpulse.out") |
| .withNoOutputs() |
| .withTransforms("goRead", "flatten", "goParDo"), |
| ExecutableStageMatcher.withInput("pyImpulse.out") |
| .withOutputs("flatten.out") |
| .withTransforms("pyRead", "flatten"), |
| ExecutableStageMatcher.withInput("flatten.out") |
| .withNoOutputs() |
| .withTransforms("goParDo"))); |
| } |
| |
| /* |
| * impulseA -> .out -> flatten -> .out -> read -> .out -> parDo -> .out |
| * becomes |
| * (flatten.out) -> read -> parDo |
| * |
| * Flatten, specifically, doesn't fuse greedily into downstream environments or act as a sibling |
| * to any of those nodes, but the routing is instead handled by the Runner. |
| */ |
| @Test |
| public void flattenAfterNoEnvDoesNotFuse() { |
| Components components = |
| partialComponents |
| .toBuilder() |
| .putTransforms( |
| "flatten", |
| PTransform.newBuilder() |
| .setUniqueName("Flatten") |
| .putInputs("impulseInput", "impulse.out") |
| .putOutputs("output", "flatten.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN) |
| .build()) |
| .build()) |
| .putPcollections("flatten.out", pc("flatten.out")) |
| .putTransforms( |
| "read", |
| PTransform.newBuilder() |
| .setUniqueName("Read") |
| .putInputs("input", "flatten.out") |
| .putOutputs("output", "read.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("read.out", pc("read.out")) |
| .putTransforms( |
| "parDo", |
| PTransform.newBuilder() |
| .setUniqueName("ParDo") |
| .putInputs("input", "read.out") |
| .putOutputs("output", "parDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn( |
| SdkFunctionSpec.newBuilder().setEnvironmentId("py").build()) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("parDo.out", pc("parDo.out")) |
| .build(); |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat( |
| fused.getRunnerExecutedTransforms(), |
| containsInAnyOrder( |
| PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse")), |
| PipelineNode.pTransform("flatten", components.getTransformsOrThrow("flatten")))); |
| assertThat( |
| fused.getFusedStages(), |
| contains( |
| ExecutableStageMatcher.withInput("flatten.out") |
| .withNoOutputs() |
| .withTransforms("read", "parDo"))); |
| } |
| |
| /* |
| * impulseA -> .out -> read -> .out -> leftParDo -> .out |
| * \ -> rightParDo -> .out |
| * ------> sideInputParDo -> .out |
| * / |
| * impulseB -> .out -> side_read -> .out / |
| * |
| * becomes |
| * (impulseA.out) -> read -> (read.out) |
| * (read.out) -> leftParDo |
| * \ |
| * -> rightParDo |
| * (read.out) -> sideInputParDo |
| * (impulseB.out) -> side_read |
| */ |
| @Test |
| public void sideInputRootsNewStage() { |
| Components components = |
| Components.newBuilder() |
| .putCoders("coder", Coder.newBuilder().build()) |
| .putCoders("windowCoder", Coder.newBuilder().build()) |
| .putWindowingStrategies( |
| "ws", WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()) |
| .putTransforms( |
| "mainImpulse", |
| PTransform.newBuilder() |
| .setUniqueName("MainImpulse") |
| .putOutputs("output", "mainImpulse.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("mainImpulse.out", pc("mainImpulse.out")) |
| .putTransforms( |
| "read", |
| PTransform.newBuilder() |
| .setUniqueName("Read") |
| .putInputs("input", "mainImpulse.out") |
| .putOutputs("output", "read.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("read.out", pc("read.out")) |
| .putTransforms( |
| "sideImpulse", |
| PTransform.newBuilder() |
| .setUniqueName("SideImpulse") |
| .putOutputs("output", "sideImpulse.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) |
| .build()) |
| .putPcollections("sideImpulse.out", pc("sideImpulse.out")) |
| .putTransforms( |
| "sideRead", |
| PTransform.newBuilder() |
| .setUniqueName("SideRead") |
| .putInputs("input", "sideImpulse.out") |
| .putOutputs("output", "sideRead.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("sideRead.out", pc("sideRead.out")) |
| .putTransforms( |
| "leftParDo", |
| PTransform.newBuilder() |
| .setUniqueName("LeftParDo") |
| .putInputs("main", "read.out") |
| .putOutputs("output", "leftParDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString()) |
| .build()) |
| .build()) |
| .putPcollections("leftParDo.out", pc("leftParDo.out")) |
| .putTransforms( |
| "rightParDo", |
| PTransform.newBuilder() |
| .setUniqueName("RightParDo") |
| .putInputs("main", "read.out") |
| .putOutputs("output", "rightParDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString()) |
| .build()) |
| .build()) |
| .putPcollections("rightParDo.out", pc("rightParDo.out")) |
| .putTransforms( |
| "sideParDo", |
| PTransform.newBuilder() |
| .setUniqueName("SideParDo") |
| .putInputs("main", "read.out") |
| .putInputs("side", "sideRead.out") |
| .putOutputs("output", "sideParDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .putSideInputs("side", SideInput.getDefaultInstance()) |
| .build() |
| .toByteString()) |
| .build()) |
| .build()) |
| .putPcollections("sideParDo.out", pc("sideParDo.out")) |
| .putEnvironments("py", Environments.createDockerEnvironment("py")) |
| .build(); |
| |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat( |
| fused.getRunnerExecutedTransforms(), |
| containsInAnyOrder( |
| PipelineNode.pTransform("mainImpulse", components.getTransformsOrThrow("mainImpulse")), |
| PipelineNode.pTransform( |
| "sideImpulse", components.getTransformsOrThrow("sideImpulse")))); |
| assertThat( |
| fused.getFusedStages(), |
| containsInAnyOrder( |
| ExecutableStageMatcher.withInput("mainImpulse.out") |
| .withOutputs("read.out") |
| .withTransforms("read"), |
| ExecutableStageMatcher.withInput("read.out") |
| .withNoOutputs() |
| .withTransforms("leftParDo", "rightParDo"), |
| ExecutableStageMatcher.withInput("read.out") |
| .withSideInputs( |
| RunnerApi.ExecutableStagePayload.SideInputId.newBuilder() |
| .setTransformId("sideParDo") |
| .setLocalName("side") |
| .build()) |
| .withNoOutputs() |
| .withTransforms("sideParDo"), |
| ExecutableStageMatcher.withInput("sideImpulse.out") |
| .withOutputs("sideRead.out") |
| .withTransforms("sideRead"))); |
| } |
| |
| /* |
| * impulse -> .out -> parDo -> .out -> stateful -> .out |
| * becomes |
| * (impulse.out) -> parDo -> (parDo.out) |
| * (parDo.out) -> stateful |
| */ |
| @Test |
| public void statefulParDoRootsStage() { |
| // (impulse.out) -> parDo -> (parDo.out) |
| // (parDo.out) -> stateful -> stateful.out |
| // stateful has a state spec which prevents it from fusing with an upstream ParDo |
| PTransform parDoTransform = |
| PTransform.newBuilder() |
| .setUniqueName("ParDo") |
| .putInputs("input", "impulse.out") |
| .putOutputs("output", "parDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) |
| .build() |
| .toByteString())) |
| .build(); |
| PTransform statefulTransform = |
| PTransform.newBuilder() |
| .setUniqueName("StatefulParDo") |
| .putInputs("input", "parDo.out") |
| .putOutputs("output", "stateful.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) |
| .putStateSpecs("state", StateSpec.getDefaultInstance()) |
| .build() |
| .toByteString())) |
| .build(); |
| |
| Components components = |
| partialComponents |
| .toBuilder() |
| .putTransforms("parDo", parDoTransform) |
| .putPcollections("parDo.out", pc("parDo.out")) |
| .putTransforms("stateful", statefulTransform) |
| .putPcollections("stateful.out", pc("stateful.out")) |
| .putEnvironments("common", Environments.createDockerEnvironment("common")) |
| .build(); |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat( |
| fused.getRunnerExecutedTransforms(), |
| containsInAnyOrder( |
| PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse")))); |
| assertThat( |
| fused.getFusedStages(), |
| containsInAnyOrder( |
| ExecutableStageMatcher.withInput("impulse.out") |
| .withOutputs("parDo.out") |
| .withTransforms("parDo"), |
| ExecutableStageMatcher.withInput("parDo.out") |
| .withNoOutputs() |
| .withTransforms("stateful"))); |
| } |
| |
| /* |
| * impulse -> .out -> parDo -> .out -> timer -> .out |
| * becomes |
| * (impulse.out) -> parDo -> (parDo.out) |
| * (parDo.out) -> timer |
| */ |
| @Test |
| public void parDoWithTimerRootsStage() { |
| // (impulse.out) -> parDo -> (parDo.out) |
| // (parDo.out) -> timer -> timer.out |
| // timer has a timer spec which prevents it from fusing with an upstream ParDo |
| PTransform parDoTransform = |
| PTransform.newBuilder() |
| .setUniqueName("ParDo") |
| .putInputs("input", "impulse.out") |
| .putOutputs("output", "parDo.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) |
| .build() |
| .toByteString())) |
| .build(); |
| PTransform timerTransform = |
| PTransform.newBuilder() |
| .setUniqueName("TimerParDo") |
| .putInputs("input", "parDo.out") |
| .putInputs("timer", "timer.out") |
| .putOutputs("timer", "timer.out") |
| .putOutputs("output", "output.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) |
| .putTimerSpecs("timer", TimerSpec.getDefaultInstance()) |
| .build() |
| .toByteString())) |
| .build(); |
| |
| Components components = |
| partialComponents |
| .toBuilder() |
| .putTransforms("parDo", parDoTransform) |
| .putPcollections("parDo.out", pc("parDo.out")) |
| .putTransforms("timer", timerTransform) |
| .putPcollections("timer.out", pc("timer.out")) |
| .putPcollections("output.out", pc("output.out")) |
| .putEnvironments("common", Environments.createDockerEnvironment("common")) |
| .build(); |
| |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat( |
| fused.getRunnerExecutedTransforms(), |
| containsInAnyOrder( |
| PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse")))); |
| assertThat( |
| fused.getFusedStages(), |
| containsInAnyOrder( |
| ExecutableStageMatcher.withInput("impulse.out") |
| .withOutputs("parDo.out") |
| .withTransforms("parDo"), |
| ExecutableStageMatcher.withInput("parDo.out").withNoOutputs().withTransforms("timer"))); |
| } |
| |
| /* |
| * Tests that parDo with state and timers is fused correctly and can be queried |
| * impulse -> .out -> timer -> .out |
| * becomes |
| * (impulse.out) -> timer |
| */ |
| @Test |
| public void parDoWithStateAndTimerRootsStage() { |
| PTransform timerTransform = |
| PTransform.newBuilder() |
| .setUniqueName("TimerParDo") |
| .putInputs("input", "impulse.out") |
| .putInputs("timer", "timer.out") |
| .putOutputs("timer", "timer.out") |
| .putOutputs("output", "output.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) |
| .putStateSpecs("state", StateSpec.getDefaultInstance()) |
| .putTimerSpecs("timer", TimerSpec.getDefaultInstance()) |
| .build() |
| .toByteString())) |
| .build(); |
| |
| Components components = |
| partialComponents |
| .toBuilder() |
| .putTransforms("timer", timerTransform) |
| .putPcollections("timer.out", pc("timer.out")) |
| .putPcollections("output.out", pc("output.out")) |
| .putEnvironments("common", Environments.createDockerEnvironment("common")) |
| .build(); |
| |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); |
| |
| assertThat( |
| fused.getRunnerExecutedTransforms(), |
| containsInAnyOrder( |
| PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse")))); |
| assertThat( |
| fused.getFusedStages(), |
| contains( |
| ExecutableStageMatcher.withInput("impulse.out") |
| .withNoOutputs() |
| .withTransforms("timer"))); |
| } |
| |
| /* |
| * impulse -> .out -> ( read -> .out --> goTransform -> .out ) |
| * \ |
| * -> pyTransform -> .out ) |
| * becomes (impulse.out) -> read -> (read.out) |
| * (read.out) -> goTransform |
| * (read.out) -> pyTransform |
| */ |
| @Test |
| public void compositesIgnored() { |
| Components components = |
| partialComponents |
| .toBuilder() |
| .putTransforms( |
| "read", |
| PTransform.newBuilder() |
| .setUniqueName("Read") |
| .putInputs("input", "impulse.out") |
| .putOutputs("output", "read.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("read.out", pc("read.out")) |
| .putTransforms( |
| "goTransform", |
| PTransform.newBuilder() |
| .setUniqueName("GoTransform") |
| .putInputs("input", "read.out") |
| .putOutputs("output", "go.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| ParDoPayload.newBuilder() |
| .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("go")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("go.out", pc("go.out")) |
| .putTransforms( |
| "pyTransform", |
| PTransform.newBuilder() |
| .setUniqueName("PyTransform") |
| .putInputs("input", "read.out") |
| .putOutputs("output", "py.out") |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN) |
| .setPayload( |
| WindowIntoPayload.newBuilder() |
| .setWindowFn( |
| SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build()) |
| .putPcollections("py.out", pc("py.out")) |
| .putTransforms( |
| "compositeMultiLang", |
| PTransform.newBuilder() |
| .setUniqueName("CompositeMultiLang") |
| .putInputs("input", "impulse.out") |
| .putOutputs("pyOut", "py.out") |
| .putOutputs("goOut", "go.out") |
| .addSubtransforms("read") |
| .addSubtransforms("goTransform") |
| .addSubtransforms("pyTransform") |
| .build()) |
| .build(); |
| FusedPipeline fused = |
| GreedyPipelineFuser.fuse( |
| Pipeline.newBuilder() |
| .addRootTransformIds("impulse") |
| .addRootTransformIds("compositeMultiLang") |
| .setComponents(components) |
| .build()); |
| |
| // Impulse is the runner transform |
| assertThat(fused.getRunnerExecutedTransforms(), hasSize(1)); |
| assertThat(fused.getFusedStages(), hasSize(3)); |
| assertThat( |
| fused.getFusedStages(), |
| containsInAnyOrder( |
| ExecutableStageMatcher.withInput("impulse.out") |
| .withOutputs("read.out") |
| .withTransforms("read"), |
| ExecutableStageMatcher.withInput("read.out") |
| .withNoOutputs() |
| .withTransforms("pyTransform"), |
| ExecutableStageMatcher.withInput("read.out") |
| .withNoOutputs() |
| .withTransforms("goTransform"))); |
| } |
| |
| @Test |
| public void sanitizedTransforms() throws Exception { |
| |
| PCollection flattenOutput = pc("flatten.out"); |
| PCollection read1Output = pc("read1.out"); |
| PCollection read2Output = pc("read2.out"); |
| PCollection impulse1Output = pc("impulse1.out"); |
| PCollection impulse2Output = pc("impulse2.out"); |
| PTransform flattenTransform = |
| PTransform.newBuilder() |
| .setUniqueName("Flatten") |
| .putInputs(read1Output.getUniqueName(), read1Output.getUniqueName()) |
| .putInputs(read2Output.getUniqueName(), read2Output.getUniqueName()) |
| .putOutputs(flattenOutput.getUniqueName(), flattenOutput.getUniqueName()) |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN) |
| .setPayload( |
| WindowIntoPayload.newBuilder() |
| .setWindowFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build(); |
| |
| PTransform read1Transform = |
| PTransform.newBuilder() |
| .setUniqueName("read1") |
| .putInputs(impulse1Output.getUniqueName(), impulse1Output.getUniqueName()) |
| .putOutputs(read1Output.getUniqueName(), read1Output.getUniqueName()) |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| WindowIntoPayload.newBuilder() |
| .setWindowFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build(); |
| PTransform read2Transform = |
| PTransform.newBuilder() |
| .setUniqueName("read2") |
| .putInputs(impulse2Output.getUniqueName(), impulse2Output.getUniqueName()) |
| .putOutputs(read2Output.getUniqueName(), read2Output.getUniqueName()) |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) |
| .setPayload( |
| WindowIntoPayload.newBuilder() |
| .setWindowFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build(); |
| |
| PTransform impulse1Transform = |
| PTransform.newBuilder() |
| .setUniqueName("impulse1") |
| .putOutputs(impulse1Output.getUniqueName(), impulse1Output.getUniqueName()) |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN) |
| .setPayload( |
| WindowIntoPayload.newBuilder() |
| .setWindowFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build(); |
| PTransform impulse2Transform = |
| PTransform.newBuilder() |
| .setUniqueName("impulse2") |
| .putOutputs(impulse2Output.getUniqueName(), impulse2Output.getUniqueName()) |
| .setSpec( |
| FunctionSpec.newBuilder() |
| .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN) |
| .setPayload( |
| WindowIntoPayload.newBuilder() |
| .setWindowFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py")) |
| .build() |
| .toByteString())) |
| .build(); |
| Pipeline impulse = |
| Pipeline.newBuilder() |
| .addRootTransformIds(impulse1Transform.getUniqueName()) |
| .addRootTransformIds(impulse2Transform.getUniqueName()) |
| .addRootTransformIds(flattenTransform.getUniqueName()) |
| .setComponents( |
| Components.newBuilder() |
| .putCoders("coder", Coder.newBuilder().build()) |
| .putCoders("windowCoder", Coder.newBuilder().build()) |
| .putWindowingStrategies( |
| "ws", |
| WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()) |
| .putEnvironments("py", Environments.createDockerEnvironment("py")) |
| .putPcollections(flattenOutput.getUniqueName(), flattenOutput) |
| .putTransforms(flattenTransform.getUniqueName(), flattenTransform) |
| .putPcollections(read1Output.getUniqueName(), read1Output) |
| .putTransforms(read1Transform.getUniqueName(), read1Transform) |
| .putPcollections(read2Output.getUniqueName(), read2Output) |
| .putTransforms(read2Transform.getUniqueName(), read2Transform) |
| .putPcollections(impulse1Output.getUniqueName(), impulse1Output) |
| .putTransforms(impulse1Transform.getUniqueName(), impulse1Transform) |
| .putPcollections(impulse2Output.getUniqueName(), impulse2Output) |
| .putTransforms(impulse2Transform.getUniqueName(), impulse2Transform) |
| .build()) |
| .build(); |
| FusedPipeline fused = GreedyPipelineFuser.fuse(impulse); |
| |
| assertThat(fused.getRunnerExecutedTransforms(), hasSize(2)); |
| assertThat(fused.getFusedStages(), hasSize(2)); |
| |
| assertThat( |
| fused.getFusedStages(), |
| containsInAnyOrder( |
| ExecutableStageMatcher.withInput(impulse1Output.getUniqueName()) |
| .withTransforms(flattenTransform.getUniqueName(), read1Transform.getUniqueName()), |
| ExecutableStageMatcher.withInput(impulse2Output.getUniqueName()) |
| .withTransforms(flattenTransform.getUniqueName(), read2Transform.getUniqueName()))); |
| assertThat( |
| fused.getFusedStages().stream() |
| .flatMap( |
| s -> |
| s.getComponents().getTransformsOrThrow(flattenTransform.getUniqueName()) |
| .getInputsMap().values().stream()) |
| .collect(Collectors.toList()), |
| containsInAnyOrder(read1Output.getUniqueName(), read2Output.getUniqueName())); |
| } |
| } |