blob: 2c422a574cb430d4cd5289a7c2c36961eaebc8e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
/** Tests for {@link PTransformTranslation}. */
@RunWith(Parameterized.class)
public class PTransformTranslationTest {
@Parameters(name = "{index}: {0}")
public static Iterable<ToAndFromProtoSpec> data() {
// This pipeline exists for construction, not to run any test.
// TODO: Leaf node with understood payload - i.e. validate payloads
ToAndFromProtoSpec readLeaf = ToAndFromProtoSpec.leaf(read(TestPipeline.create()));
ToAndFromProtoSpec readMultipleInAndOut =
ToAndFromProtoSpec.leaf(multiMultiParDo(TestPipeline.create()));
TestPipeline compositeReadPipeline = TestPipeline.create();
ToAndFromProtoSpec compositeRead =
ToAndFromProtoSpec.composite(
generateSequence(compositeReadPipeline),
ToAndFromProtoSpec.leaf(read(compositeReadPipeline)));
ToAndFromProtoSpec rawLeafNullSpec =
ToAndFromProtoSpec.leaf(rawPTransformWithNullSpec(TestPipeline.create()));
return ImmutableList.<ToAndFromProtoSpec>builder()
.add(readLeaf)
.add(readMultipleInAndOut)
.add(compositeRead)
.add(rawLeafNullSpec)
// TODO: Composite with multiple children
// TODO: Composite with a composite child
.build();
}
@AutoValue
abstract static class ToAndFromProtoSpec {
public static ToAndFromProtoSpec leaf(AppliedPTransform<?, ?, ?> transform) {
return new AutoValue_PTransformTranslationTest_ToAndFromProtoSpec(
transform, Collections.emptyList());
}
public static ToAndFromProtoSpec composite(
AppliedPTransform<?, ?, ?> topLevel, ToAndFromProtoSpec spec, ToAndFromProtoSpec... specs) {
List<ToAndFromProtoSpec> childSpecs = new ArrayList<>();
childSpecs.add(spec);
childSpecs.addAll(Arrays.asList(specs));
return new AutoValue_PTransformTranslationTest_ToAndFromProtoSpec(topLevel, childSpecs);
}
abstract AppliedPTransform<?, ?, ?> getTransform();
abstract Collection<ToAndFromProtoSpec> getChildren();
}
@Parameter(0)
public ToAndFromProtoSpec spec;
@Test
public void toAndFromProto() throws IOException {
SdkComponents components = SdkComponents.create(spec.getTransform().getPipeline().getOptions());
RunnerApi.PTransform converted = convert(spec, components);
Components protoComponents = components.toComponents();
// Sanity checks
assertThat(converted.getInputsCount(), equalTo(spec.getTransform().getInputs().size()));
assertThat(converted.getOutputsCount(), equalTo(spec.getTransform().getOutputs().size()));
assertThat(converted.getSubtransformsCount(), equalTo(spec.getChildren().size()));
assertThat(converted.getUniqueName(), equalTo(spec.getTransform().getFullName()));
for (PValue inputValue : spec.getTransform().getInputs().values()) {
PCollection<?> inputPc = (PCollection<?>) inputValue;
protoComponents.getPcollectionsOrThrow(components.registerPCollection(inputPc));
}
for (PValue outputValue : spec.getTransform().getOutputs().values()) {
PCollection<?> outputPc = (PCollection<?>) outputValue;
protoComponents.getPcollectionsOrThrow(components.registerPCollection(outputPc));
}
}
private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents components)
throws IOException {
List<AppliedPTransform<?, ?, ?>> childTransforms = new ArrayList<>();
for (ToAndFromProtoSpec child : spec.getChildren()) {
childTransforms.add(child.getTransform());
System.out.println("Converting child " + child);
convert(child, components);
// Sanity call
components.getExistingPTransformId(child.getTransform());
}
RunnerApi.PTransform convert =
PTransformTranslation.toProto(spec.getTransform(), childTransforms, components);
// Make sure the converted transform is registered. Convert it independently, but if this is a
// child spec, the child must be in the components.
components.registerPTransform(spec.getTransform(), childTransforms);
return convert;
}
private static class TestDoFn extends DoFn<Long, KV<Long, String>> {
// Exists to stop the ParDo application from throwing
@ProcessElement
public void process(ProcessContext context) {}
}
private static AppliedPTransform<?, ?, ?> generateSequence(Pipeline pipeline) {
GenerateSequence sequence = GenerateSequence.from(0);
PCollection<Long> pcollection = pipeline.apply(sequence);
return AppliedPTransform.of(
"Count", pipeline.begin().expand(), pcollection.expand(), sequence, pipeline);
}
private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) {
Read.Unbounded<Long> transform = Read.from(CountingSource.unbounded());
PCollection<Long> pcollection = pipeline.apply(transform);
return AppliedPTransform.of(
"ReadTheCount", pipeline.begin().expand(), pcollection.expand(), transform, pipeline);
}
private static AppliedPTransform<?, ?, ?> rawPTransformWithNullSpec(Pipeline pipeline) {
PTransformTranslation.RawPTransform<PBegin, PDone> rawPTransform =
new PTransformTranslation.RawPTransform<PBegin, PDone>() {
@Override
public String getUrn() {
return "fake/urn";
}
@Nullable
@Override
public RunnerApi.FunctionSpec getSpec() {
return null;
}
};
return AppliedPTransform.<PBegin, PDone, PTransform<PBegin, PDone>>of(
"RawPTransformWithNoSpec",
pipeline.begin().expand(),
PDone.in(pipeline).expand(),
rawPTransform,
pipeline);
}
private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
PCollectionView<String> view = pipeline.apply(Create.of("foo")).apply(View.asSingleton());
PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
ParDo.MultiOutput<Long, KV<Long, String>> parDo =
ParDo.of(new TestDoFn())
.withSideInputs(view)
.withOutputTags(
new TupleTag<KV<Long, String>>() {},
TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
PCollectionTuple output = input.apply(parDo);
Map<TupleTag<?>, PValue> inputs = new HashMap<>();
inputs.putAll(parDo.getAdditionalInputs());
inputs.putAll(input.expand());
return AppliedPTransform
.<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of(
"MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
}
}