blob: 94ad3e8abd687f09f0d49936fe1121fc67fecb46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction.graph;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.getOnlyElement;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link GreedyStageFuser}. */
@RunWith(JUnit4.class)
public class GreedyStageFuserTest {
@Rule public ExpectedException thrown = ExpectedException.none();
private final PCollection impulseDotOut =
PCollection.newBuilder().setUniqueName("impulse.out").build();
private final PCollectionNode impulseOutputNode =
PipelineNode.pCollection("impulse.out", impulseDotOut);
private Components partialComponents;
@Before
public void setup() {
partialComponents =
Components.newBuilder()
.putTransforms(
"impulse",
PTransform.newBuilder()
.putOutputs("output", "impulse.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN))
.build())
.putPcollections("impulse.out", impulseDotOut)
.build();
}
@Test
public void noInitialConsumersThrows() {
// (impulse.out) -> () is not a meaningful stage, so it should never be called
QueryablePipeline p = QueryablePipeline.forPrimitivesIn(partialComponents);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("at least one PTransform");
GreedyStageFuser.forGrpcPortRead(p, impulseOutputNode, Collections.emptySet());
}
@Test
public void differentEnvironmentsThrows() {
// (impulse.out) -> read -> read.out --> go -> go.out
// \
// -> py -> py.out
// read.out can't be fused with both 'go' and 'py', so we should refuse to create this stage
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms(
"read",
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("input", "impulse.out")
.putOutputs("output", "read.out")
.build())
.putPcollections(
"read.out", PCollection.newBuilder().setUniqueName("read.out").build())
.putTransforms(
"goTransform",
PTransform.newBuilder()
.putInputs("input", "read.out")
.putOutputs("output", "go.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(
SdkFunctionSpec.newBuilder().setEnvironmentId("go"))
.build()
.toByteString()))
.build())
.putPcollections("go.out", PCollection.newBuilder().setUniqueName("go.out").build())
.putTransforms(
"pyTransform",
PTransform.newBuilder()
.putInputs("input", "read.out")
.putOutputs("output", "py.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
.setPayload(
WindowIntoPayload.newBuilder()
.setWindowFn(
SdkFunctionSpec.newBuilder().setEnvironmentId("py"))
.build()
.toByteString()))
.build())
.putPcollections("py.out", PCollection.newBuilder().setUniqueName("py.out").build())
.putEnvironments("go", Environments.createDockerEnvironment("go"))
.putEnvironments("py", Environments.createDockerEnvironment("py"))
.build());
Set<PTransformNode> differentEnvironments =
p.getPerElementConsumers(
PipelineNode.pCollection(
"read.out", PCollection.newBuilder().setUniqueName("read.out").build()));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("go");
thrown.expectMessage("py");
thrown.expectMessage("same");
GreedyStageFuser.forGrpcPortRead(
p,
PipelineNode.pCollection(
"read.out", PCollection.newBuilder().setUniqueName("read.out").build()),
differentEnvironments);
}
@Test
public void noEnvironmentThrows() {
// (impulse.out) -> runnerTransform -> gbk.out
// runnerTransform can't be executed in an environment, so trying to construct it should fail
PTransform gbkTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.setSpec(
FunctionSpec.newBuilder().setUrn(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN))
.putOutputs("output", "gbk.out")
.build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("runnerTransform", gbkTransform)
.putPcollections(
"gbk.out", PCollection.newBuilder().setUniqueName("gbk.out").build())
.build());
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Environment must be populated");
GreedyStageFuser.forGrpcPortRead(
p,
impulseOutputNode,
ImmutableSet.of(PipelineNode.pTransform("runnerTransform", gbkTransform)));
}
@Test
public void fusesCompatibleEnvironments() {
// (impulse.out) -> parDo -> parDo.out -> window -> window.out
// parDo and window both have the environment "common" and can be fused together
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform windowTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "window.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
.setPayload(
WindowIntoPayload.newBuilder()
.setWindowFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms("window", windowTransform)
.putPcollections(
"window.out", PCollection.newBuilder().setUniqueName("window.out").build())
.putEnvironments("common", Environments.createDockerEnvironment("common"))
.build());
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p,
impulseOutputNode,
ImmutableSet.of(
PipelineNode.pTransform("parDo", parDoTransform),
PipelineNode.pTransform("window", windowTransform)));
// Nothing consumes the outputs of ParDo or Window, so they don't have to be materialized
assertThat(subgraph.getOutputPCollections(), emptyIterable());
assertThat(subgraph, hasSubtransforms("parDo", "window"));
}
@Test
public void materializesWithStatefulConsumer() {
// (impulse.out) -> parDo -> (parDo.out)
// (parDo.out) -> stateful -> stateful.out
// stateful has a state spec which prevents it from fusing with an upstream ParDo
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform statefulTransform =
PTransform.newBuilder()
.putInputs("input", "parDo.out")
.putOutputs("output", "stateful.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putStateSpecs("state", StateSpec.getDefaultInstance())
.build()
.toByteString()))
.build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms("stateful", statefulTransform)
.putPcollections(
"stateful.out", PCollection.newBuilder().setUniqueName("stateful.out").build())
.putEnvironments("common", Environments.createDockerEnvironment("common"))
.build());
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p,
impulseOutputNode,
ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
assertThat(
subgraph.getOutputPCollections(),
contains(
PipelineNode.pCollection(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())));
assertThat(subgraph, hasSubtransforms("parDo"));
}
@Test
public void materializesWithConsumerWithTimer() {
// (impulse.out) -> parDo -> (parDo.out)
// (parDo.out) -> timer -> timer.out
// timer has a timer spec which prevents it from fusing with an upstream ParDo
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform timerTransform =
PTransform.newBuilder()
.putInputs("input", "parDo.out")
.putOutputs("output", "timer.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putTimerSpecs("timer", TimerSpec.getDefaultInstance())
.build()
.toByteString()))
.build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms("timer", timerTransform)
.putPcollections(
"timer.out", PCollection.newBuilder().setUniqueName("timer.out").build())
.putEnvironments("common", Environments.createDockerEnvironment("common"))
.build());
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p,
impulseOutputNode,
ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
assertThat(
subgraph.getOutputPCollections(),
contains(
PipelineNode.pCollection(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())));
assertThat(subgraph, hasSubtransforms("parDo"));
}
@Test
public void fusesFlatten() {
// (impulse.out) -> parDo -> parDo.out --> flatten -> flatten.out -> window -> window.out
// \ /
// -> read -> read.out -
// The flatten can be executed within the same environment as any transform; the window can
// execute in the same environment as the rest of the transforms, and can fuse with the stage
PTransform readTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "read.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform flattenTransform =
PTransform.newBuilder()
.putInputs("readInput", "read.out")
.putInputs("parDoInput", "parDo.out")
.putOutputs("output", "flatten.out")
.setSpec(FunctionSpec.newBuilder().setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN))
.build();
PTransform windowTransform =
PTransform.newBuilder()
.putInputs("input", "flatten.out")
.putOutputs("output", "window.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
.setPayload(
WindowIntoPayload.newBuilder()
.setWindowFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("read", readTransform)
.putPcollections(
"read.out", PCollection.newBuilder().setUniqueName("read.out").build())
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms("flatten", flattenTransform)
.putPcollections(
"flatten.out", PCollection.newBuilder().setUniqueName("flatten.out").build())
.putTransforms("window", windowTransform)
.putPcollections(
"window.out", PCollection.newBuilder().setUniqueName("window.out").build())
.putEnvironments("common", Environments.createDockerEnvironment("common"))
.build());
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode, p.getPerElementConsumers(impulseOutputNode));
assertThat(subgraph.getOutputPCollections(), emptyIterable());
assertThat(subgraph, hasSubtransforms("read", "parDo", "flatten", "window"));
}
@Test
public void fusesFlattenWithDifferentEnvironmentInputs() {
// (impulse.out) -> read -> read.out \ -> window -> window.out
// -------> flatten -> flatten.out /
// (impulse.out) -> envRead -> envRead.out /
// fuses into
// read -> read.out -> flatten -> flatten.out -> window -> window.out
// envRead -> envRead.out -> flatten -> (flatten.out)
// (flatten.out) -> window -> window.out
PTransform readTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "read.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform otherEnvRead =
PTransform.newBuilder()
.putInputs("impulse", "impulse.out")
.putOutputs("output", "envRead.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("rare"))
.build()
.toByteString()))
.build();
PTransform flattenTransform =
PTransform.newBuilder()
.putInputs("readInput", "read.out")
.putInputs("otherEnvInput", "envRead.out")
.putOutputs("output", "flatten.out")
.setSpec(FunctionSpec.newBuilder().setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN))
.build();
PTransform windowTransform =
PTransform.newBuilder()
.putInputs("input", "flatten.out")
.putOutputs("output", "window.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
.setPayload(
WindowIntoPayload.newBuilder()
.setWindowFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
Components components =
partialComponents
.toBuilder()
.putTransforms("read", readTransform)
.putPcollections("read.out", PCollection.newBuilder().setUniqueName("read.out").build())
.putTransforms("envRead", otherEnvRead)
.putPcollections(
"envRead.out", PCollection.newBuilder().setUniqueName("envRead.out").build())
.putTransforms("flatten", flattenTransform)
.putPcollections(
"flatten.out", PCollection.newBuilder().setUniqueName("flatten.out").build())
.putTransforms("window", windowTransform)
.putPcollections(
"window.out", PCollection.newBuilder().setUniqueName("window.out").build())
.putEnvironments("common", Environments.createDockerEnvironment("common"))
.putEnvironments("rare", Environments.createDockerEnvironment("rare"))
.build();
QueryablePipeline p = QueryablePipeline.forPrimitivesIn(components);
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode, ImmutableSet.of(PipelineNode.pTransform("read", readTransform)));
assertThat(subgraph.getOutputPCollections(), emptyIterable());
assertThat(subgraph, hasSubtransforms("read", "flatten", "window"));
// Flatten shows up in both of these subgraphs, but elements only go through a path to the
// flatten once.
ExecutableStage readFromOtherEnv =
GreedyStageFuser.forGrpcPortRead(
p,
impulseOutputNode,
ImmutableSet.of(PipelineNode.pTransform("envRead", otherEnvRead)));
assertThat(
readFromOtherEnv.getOutputPCollections(),
contains(
PipelineNode.pCollection(
"flatten.out", components.getPcollectionsOrThrow("flatten.out"))));
assertThat(readFromOtherEnv, hasSubtransforms("envRead", "flatten"));
}
@Test
public void flattenWithHeterogeneousInputsAndOutputs() {
// (impulse.out) -> pyRead -> pyRead.out \ -> pyParDo -> pyParDo.out
// (impulse.out) -> -> flatten -> flatten.out |
// (impulse.out) -> goRead -> goRead.out / -> goWindow -> goWindow.out
// fuses into
// (impulse.out) -> pyRead -> pyRead.out -> flatten -> (flatten.out)
// (impulse.out) -> goRead -> goRead.out -> flatten -> (flatten.out)
// (flatten.out) -> pyParDo -> pyParDo.out
// (flatten.out) -> goWindow -> goWindow.out
PTransform pyRead =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "pyRead.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py"))
.build()
.toByteString())
.build())
.build();
PTransform goRead =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "goRead.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("go"))
.build()
.toByteString())
.build())
.build();
PTransform pyParDo =
PTransform.newBuilder()
.putInputs("input", "flatten.out")
.putOutputs("output", "pyParDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("py"))
.build()
.toByteString())
.build())
.build();
PTransform goWindow =
PTransform.newBuilder()
.putInputs("input", "flatten.out")
.putOutputs("output", "goWindow.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
.setPayload(
WindowIntoPayload.newBuilder()
.setWindowFn(SdkFunctionSpec.newBuilder().setEnvironmentId("go"))
.build()
.toByteString())
.build())
.build();
PCollection flattenPc = PCollection.newBuilder().setUniqueName("flatten.out").build();
Components components =
partialComponents
.toBuilder()
.putTransforms("pyRead", pyRead)
.putPcollections(
"pyRead.out", PCollection.newBuilder().setUniqueName("pyRead.out").build())
.putTransforms("goRead", goRead)
.putPcollections(
"goRead.out", PCollection.newBuilder().setUniqueName("goRead.out").build())
.putTransforms(
"flatten",
PTransform.newBuilder()
.putInputs("py_input", "pyRead.out")
.putInputs("go_input", "goRead.out")
.putOutputs("output", "flatten.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN)
.build())
.build())
.putPcollections("flatten.out", flattenPc)
.putTransforms("pyParDo", pyParDo)
.putPcollections(
"pyParDo.out", PCollection.newBuilder().setUniqueName("pyParDo.out").build())
.putTransforms("goWindow", goWindow)
.putPcollections(
"goWindow.out", PCollection.newBuilder().setUniqueName("goWindow.out").build())
.putEnvironments("go", Environments.createDockerEnvironment("go"))
.putEnvironments("py", Environments.createDockerEnvironment("py"))
.build();
QueryablePipeline p = QueryablePipeline.forPrimitivesIn(components);
ExecutableStage readFromPy =
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode, ImmutableSet.of(PipelineNode.pTransform("pyRead", pyRead)));
ExecutableStage readFromGo =
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode, ImmutableSet.of(PipelineNode.pTransform("goRead", goRead)));
assertThat(
readFromPy.getOutputPCollections(),
contains(PipelineNode.pCollection("flatten.out", flattenPc)));
// The stage must materialize the flatten, so the `go` stage can read it; this means that this
// parDo can't be in the stage, as it'll be a reader of that materialized PCollection. The same
// is true for the go window.
assertThat(
readFromPy.getTransforms(), not(hasItem(PipelineNode.pTransform("pyParDo", pyParDo))));
assertThat(
readFromGo.getOutputPCollections(),
contains(PipelineNode.pCollection("flatten.out", flattenPc)));
assertThat(
readFromGo.getTransforms(), not(hasItem(PipelineNode.pTransform("goWindow", goWindow))));
}
@Test
public void materializesWithDifferentEnvConsumer() {
// (impulse.out) -> parDo -> parDo.out -> window -> window.out
// Fuses into
// (impulse.out) -> parDo -> (parDo.out)
// (parDo.out) -> window -> window.out
Environment env = Environments.createDockerEnvironment("common");
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("out", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PCollection parDoOutput = PCollection.newBuilder().setUniqueName("parDo.out").build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("parDo", parDoTransform)
.putPcollections("parDo.out", parDoOutput)
.putTransforms(
"window",
PTransform.newBuilder()
.putInputs("input", "parDo.out")
.putOutputs("output", "window.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
.setPayload(
WindowIntoPayload.newBuilder()
.setWindowFn(
SdkFunctionSpec.newBuilder().setEnvironmentId("rare"))
.build()
.toByteString()))
.build())
.putPcollections(
"window.out", PCollection.newBuilder().setUniqueName("window.out").build())
.putEnvironments("rare", Environments.createDockerEnvironment("rare"))
.putEnvironments("common", env)
.build());
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode, p.getPerElementConsumers(impulseOutputNode));
assertThat(
subgraph.getOutputPCollections(),
contains(PipelineNode.pCollection("parDo.out", parDoOutput)));
assertThat(subgraph.getInputPCollection(), equalTo(impulseOutputNode));
assertThat(subgraph.getEnvironment(), equalTo(env));
assertThat(
subgraph.getTransforms(), contains(PipelineNode.pTransform("parDo", parDoTransform)));
}
@Test
public void materializesWithDifferentEnvSibling() {
// (impulse.out) -> read -> read.out -> parDo -> parDo.out
// \
// -> window -> window.out
// Fuses into
// (impulse.out) -> read -> (read.out)
// (read.out) -> parDo -> parDo.out
// (read.out) -> window -> window.out
// The window can't be fused into the stage, which forces the PCollection to be materialized.
// ParDo in this case _could_ be fused into the stage, but is not for simplicity of
// implementation
Environment env = Environments.createDockerEnvironment("common");
PTransform readTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "read.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("read", readTransform)
.putPcollections(
"read.out", PCollection.newBuilder().setUniqueName("read.out").build())
.putTransforms(
"parDo",
PTransform.newBuilder()
.putInputs("input", "read.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(
SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build())
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms(
"window",
PTransform.newBuilder()
.putInputs("input", "read.out")
.putOutputs("output", "window.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
.setPayload(
WindowIntoPayload.newBuilder()
.setWindowFn(
SdkFunctionSpec.newBuilder().setEnvironmentId("rare"))
.build()
.toByteString()))
.build())
.putPcollections(
"window.out", PCollection.newBuilder().setUniqueName("window.out").build())
.putEnvironments("rare", Environments.createDockerEnvironment("rare"))
.putEnvironments("common", env)
.build());
PTransformNode readNode = PipelineNode.pTransform("read", readTransform);
PCollectionNode readOutput = getOnlyElement(p.getOutputPCollections(readNode));
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode, ImmutableSet.of(PipelineNode.pTransform("read", readTransform)));
assertThat(subgraph.getOutputPCollections(), contains(readOutput));
assertThat(subgraph.getTransforms(), contains(readNode));
}
@Test
public void materializesWithSideInputConsumer() {
// (impulse.out) -> read -> read.out -----------> parDo -> parDo.out -> window -> window.out
// (impulse.out) -> side_read -> side_read.out /
// Where parDo takes side_read as a side input, fuses into
// (impulse.out) -> read -> (read.out)
// (impulse.out) -> side_read -> (side_read.out)
// (read.out) -> parDo -> parDo.out -> window -> window.out
// parDo doesn't have a per-element consumer from side_read.out, so it can't root a stage
// which consumes from that materialized collection. Nodes with side inputs must root a stage,
// but do not restrict fusion of consumers.
Environment env = Environments.createDockerEnvironment("common");
PTransform readTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "read.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("read", readTransform)
.putPcollections(
"read.out", PCollection.newBuilder().setUniqueName("read.out").build())
.putTransforms(
"side_read",
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN))
.putInputs("input", "impulse.out")
.putOutputs("output", "side_read.out")
.build())
.putPcollections(
"side_read.out",
PCollection.newBuilder().setUniqueName("side_read.out").build())
.putTransforms(
"parDo",
PTransform.newBuilder()
.putInputs("input", "read.out")
.putInputs("side_input", "side_read.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(
SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putSideInputs("side_input", SideInput.getDefaultInstance())
.build()
.toByteString()))
.build())
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putTransforms(
"window",
PTransform.newBuilder()
.putInputs("input", "read.out")
.putOutputs("output", "window.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)
.setPayload(
WindowIntoPayload.newBuilder()
.setWindowFn(
SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build())
.putPcollections(
"window.out", PCollection.newBuilder().setUniqueName("window.out").build())
.putEnvironments("common", env)
.build());
PTransformNode readNode = PipelineNode.pTransform("read", readTransform);
PCollectionNode readOutput = getOnlyElement(p.getOutputPCollections(readNode));
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(p, impulseOutputNode, ImmutableSet.of(readNode));
assertThat(subgraph.getOutputPCollections(), contains(readOutput));
assertThat(subgraph, hasSubtransforms(readNode.getId()));
}
@Test
public void sideInputIncludedInStage() {
Environment env = Environments.createDockerEnvironment("common");
PTransform readTransform =
PTransform.newBuilder()
.setUniqueName("read")
.putInputs("input", "impulse.out")
.putOutputs("output", "read.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform parDoTransform =
PTransform.newBuilder()
.setUniqueName("parDo")
.putInputs("input", "read.out")
.putInputs("side_input", "side_read.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putSideInputs("side_input", SideInput.getDefaultInstance())
.build()
.toByteString()))
.build();
PCollection sideInputPCollection =
PCollection.newBuilder().setUniqueName("side_read.out").build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("read", readTransform)
.putPcollections(
"read.out", PCollection.newBuilder().setUniqueName("read.out").build())
.putTransforms(
"side_read",
PTransform.newBuilder()
.setUniqueName("side_read")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN))
.putInputs("input", "impulse.out")
.putOutputs("output", "side_read.out")
.build())
.putPcollections("side_read.out", sideInputPCollection)
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putEnvironments("common", env)
.build());
PCollectionNode readOutput =
getOnlyElement(p.getOutputPCollections(PipelineNode.pTransform("read", readTransform)));
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p, readOutput, ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
PTransformNode parDoNode = PipelineNode.pTransform("parDo", parDoTransform);
SideInputReference sideInputRef =
SideInputReference.of(
parDoNode,
"side_input",
PipelineNode.pCollection("side_read.out", sideInputPCollection));
assertThat(subgraph.getSideInputs(), contains(sideInputRef));
assertThat(subgraph.getOutputPCollections(), emptyIterable());
}
@Test
public void executableStageProducingSideInputMaterializesIt() {
// impulse -- ParDo(createSide)
// \_ ParDo(processMain) with side input from createSide
// The ExecutableStage executing createSide must have an output.
Environment env = Environments.createDockerEnvironment("common");
PTransform impulse =
PTransform.newBuilder()
.setUniqueName("impulse")
.putOutputs("output", "impulsePC")
.setSpec(FunctionSpec.newBuilder().setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN))
.build();
PTransform createSide =
PTransform.newBuilder()
.setUniqueName("createSide")
.putInputs("input", "impulsePC")
.putOutputs("output", "sidePC")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform processMain =
PTransform.newBuilder()
.setUniqueName("processMain")
.putInputs("main", "impulsePC")
.putInputs("side", "sidePC")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putSideInputs("side", SideInput.getDefaultInstance())
.build()
.toByteString()))
.build();
PCollection sidePC = PCollection.newBuilder().setUniqueName("sidePC").build();
PCollection impulsePC = PCollection.newBuilder().setUniqueName("impulsePC").build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("impulse", impulse)
.putTransforms("createSide", createSide)
.putTransforms("processMain", processMain)
.putPcollections("impulsePC", impulsePC)
.putPcollections("sidePC", sidePC)
.putEnvironments("common", env)
.build());
PCollectionNode impulseOutput =
getOnlyElement(p.getOutputPCollections(PipelineNode.pTransform("impulse", impulse)));
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p, impulseOutput, ImmutableSet.of(PipelineNode.pTransform("createSide", createSide)));
assertThat(
subgraph.getOutputPCollections(), contains(PipelineNode.pCollection("sidePC", sidePC)));
}
@Test
public void userStateIncludedInStage() {
Environment env = Environments.createDockerEnvironment("common");
PTransform readTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "read.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "read.out")
.putOutputs("output", "parDo.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.putStateSpecs("state_spec", StateSpec.getDefaultInstance())
.build()
.toByteString()))
.build();
PCollection userStateMainInputPCollection =
PCollection.newBuilder().setUniqueName("read.out").build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("read", readTransform)
.putPcollections("read.out", userStateMainInputPCollection)
.putTransforms(
"user_state",
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "user_state.out")
.build())
.putPcollections(
"user_state.out",
PCollection.newBuilder().setUniqueName("user_state.out").build())
.putTransforms("parDo", parDoTransform)
.putPcollections(
"parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putEnvironments("common", env)
.build());
PCollectionNode readOutput =
getOnlyElement(p.getOutputPCollections(PipelineNode.pTransform("read", readTransform)));
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(
p, readOutput, ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
PTransformNode parDoNode = PipelineNode.pTransform("parDo", parDoTransform);
UserStateReference userStateRef =
UserStateReference.of(
parDoNode,
"state_spec",
PipelineNode.pCollection("read.out", userStateMainInputPCollection));
assertThat(subgraph.getUserStates(), contains(userStateRef));
assertThat(subgraph.getOutputPCollections(), emptyIterable());
}
@Test
public void materializesWithGroupByKeyConsumer() {
// (impulse.out) -> read -> read.out -> gbk -> gbk.out
// Fuses to
// (impulse.out) -> read -> (read.out)
// GBK is the responsibility of the runner, so it is not included in a stage.
Environment env = Environments.createDockerEnvironment("common");
PTransform readTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
.putOutputs("output", "read.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
.build()
.toByteString()))
.build();
QueryablePipeline p =
QueryablePipeline.forPrimitivesIn(
partialComponents
.toBuilder()
.putTransforms("read", readTransform)
.putPcollections(
"read.out", PCollection.newBuilder().setUniqueName("read.out").build())
.putTransforms(
"gbk",
PTransform.newBuilder()
.putInputs("input", "read.out")
.putOutputs("output", "gbk.out")
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN))
.build())
.putPcollections(
"gbk.out", PCollection.newBuilder().setUniqueName("parDo.out").build())
.putEnvironments("common", env)
.build());
PTransformNode readNode = PipelineNode.pTransform("read", readTransform);
PCollectionNode readOutput = getOnlyElement(p.getOutputPCollections(readNode));
ExecutableStage subgraph =
GreedyStageFuser.forGrpcPortRead(p, impulseOutputNode, ImmutableSet.of(readNode));
assertThat(subgraph.getOutputPCollections(), contains(readOutput));
assertThat(subgraph, hasSubtransforms(readNode.getId()));
}
private static TypeSafeMatcher<ExecutableStage> hasSubtransforms(String id, String... ids) {
Set<String> expectedTransforms = ImmutableSet.<String>builder().add(id).add(ids).build();
return new TypeSafeMatcher<ExecutableStage>() {
@Override
protected boolean matchesSafely(ExecutableStage executableStage) {
// NOTE: Transform names must be unique, so it's fine to throw here if this does not hold.
Set<String> stageTransforms =
executableStage.getTransforms().stream()
.map(PTransformNode::getId)
.collect(Collectors.toSet());
return stageTransforms.containsAll(expectedTransforms)
&& expectedTransforms.containsAll(stageTransforms);
}
@Override
public void describeTo(Description description) {
description.appendText("ExecutableStage with subtransform ids: " + expectedTransforms);
}
};
}
}