blob: 6ca799b12087cc7d2875c5bd9b3bc370d80ac956 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesCrossLanguageTransforms;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ConnectivityState;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannelBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Runner Validation Test Suite for Cross-language Transforms.
*
* <p>As per Beams's Portability Framework design, Cross-language transforms should work out of the
* box. In spite of this, there always exists a possibility of rough edges existing. It could be
* caused due to unpolished implementation of any part of the execution code path, for example: –>
* Transform expansion [SDK] –> Pipeline construction [SDK] –> Cross-language artifact staging
* [Runner] –> Language specific serialization/deserialization of PCollection (and other data types)
* [Runner/SDK]
*
* <p>In an effort to improve developer visibility into potential problems, this test suite
* validates correct execution of 5 Core Beam transforms when used as cross-language transforms
* within the Java SDK from any foreign SDK: –> ParDo
* (https://beam.apache.org/documentation/programming-guide/#pardo) –> GroupByKey
* (https://beam.apache.org/documentation/programming-guide/#groupbykey) –> CoGroupByKey
* (https://beam.apache.org/documentation/programming-guide/#cogroupbykey) –> Combine
* (https://beam.apache.org/documentation/programming-guide/#combine) –> Flatten
* (https://beam.apache.org/documentation/programming-guide/#flatten) –> Partition
* (https://beam.apache.org/documentation/programming-guide/#partition)
*
* <p>See Runner Validation Test Plan for Cross-language transforms
* (https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA") for further
* details.
*/
@RunWith(JUnit4.class)
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class ValidateRunnerXlangTest implements Serializable {
@Rule public transient TestPipeline testPipeline = TestPipeline.create();
private PipelineResult pipelineResult;
// URNs for core cross-language transforms.
// See https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA for further
// details.
private static final String TEST_PREFIX_URN = "beam:transforms:xlang:test:prefix";
private static final String TEST_MULTI_URN = "beam:transforms:xlang:test:multi";
private static final String TEST_GBK_URN = "beam:transforms:xlang:test:gbk";
private static final String TEST_CGBK_URN = "beam:transforms:xlang:test:cgbk";
private static final String TEST_COMGL_URN = "beam:transforms:xlang:test:comgl";
private static final String TEST_COMPK_URN = "beam:transforms:xlang:test:compk";
private static final String TEST_FLATTEN_URN = "beam:transforms:xlang:test:flatten";
private static final String TEST_PARTITION_URN = "beam:transforms:xlang:test:partition";
private static String expansionAddr;
private static String expansionJar;
@BeforeClass
public static void setUpClass() {
expansionAddr =
String.format("localhost:%s", Integer.valueOf(System.getProperty("expansionPort")));
expansionJar = System.getProperty("expansionJar");
}
@Before
public void setUp() {
ExperimentalOptions.addExperiment(
testPipeline.getOptions().as(ExperimentalOptions.class), "jar_packages=" + expansionJar);
waitForReady();
}
@After
public void tearDown() {
pipelineResult = testPipeline.run();
pipelineResult.waitUntilFinish();
assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE));
}
private void waitForReady() {
try {
ManagedChannel channel = ManagedChannelBuilder.forTarget(expansionAddr).build();
ConnectivityState state = channel.getState(true);
for (int retry = 0; retry < 30 && state != ConnectivityState.READY; retry++) {
Thread.sleep(500);
state = channel.getState(true);
}
channel.shutdownNow();
} catch (InterruptedException e) {
throw new RuntimeException("interrupted.");
}
}
/**
* Motivation behind singleInputOutputTest.
*
* <p>Target transform – ParDo (https://beam.apache.org/documentation/programming-guide/#pardo)
* Test scenario – Mapping elements from a single input collection to a single output collection
* Boundary conditions checked – –> PCollection<?> to external transforms –> PCollection<?> from
* external transforms
*/
@Test
@Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
public void singleInputOutputTest() throws IOException {
PCollection<String> col =
testPipeline
.apply(Create.of("1", "2", "3"))
.apply(External.of(TEST_PREFIX_URN, toStringPayloadBytes("0"), expansionAddr));
PAssert.that(col).containsInAnyOrder("01", "02", "03");
}
/**
* Motivation behind multiInputOutputWithSideInputTest.
*
* <p>Target transform – ParDo (https://beam.apache.org/documentation/programming-guide/#pardo)
* Test scenario – Mapping elements from multiple input collections (main and side) to multiple
* output collections (main and side) Boundary conditions checked – –> PCollectionTuple to
* external transforms –> PCollectionTuple from external transforms
*/
@Test
@Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
public void multiInputOutputWithSideInputTest() {
PCollection<String> main1 = testPipeline.apply("createMain1", Create.of("a", "bb"));
PCollection<String> main2 = testPipeline.apply("createMain2", Create.of("x", "yy", "zzz"));
PCollection<String> side = testPipeline.apply("createSide", Create.of("s"));
PCollectionTuple pTuple =
PCollectionTuple.of("main1", main1)
.and("main2", main2)
.and("side", side)
.apply(External.of(TEST_MULTI_URN, new byte[] {}, expansionAddr).withMultiOutputs());
PAssert.that(pTuple.get("main")).containsInAnyOrder("as", "bbs", "xs", "yys", "zzzs");
PAssert.that(pTuple.get("side")).containsInAnyOrder("ss");
}
/**
* Motivation behind groupByKeyTest.
*
* <p>Target transform – GroupByKey
* (https://beam.apache.org/documentation/programming-guide/#groupbykey) Test scenario – Grouping
* a collection of KV<K,V> to a collection of KV<K, Iterable<V>> by key Boundary conditions
* checked – –> PCollection<KV<?, ?>> to external transforms –> PCollection<KV<?, Iterable<?>>>
* from external transforms
*/
@Test
@Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
public void groupByKeyTest() {
PCollection<KV<Long, Iterable<String>>> gbkCol =
testPipeline
.apply(Create.of(KV.of(0L, "1"), KV.of(0L, "2"), KV.of(1L, "3")))
.apply(External.of(TEST_GBK_URN, new byte[] {}, expansionAddr));
PCollection<String> col =
gbkCol.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<Long, Iterable<String>> kv) -> {
String[] values = Iterables.toArray(kv.getValue(), String.class);
Arrays.sort(values);
return String.format("%s:%s", kv.getKey(), String.join(",", values));
}));
PAssert.that(col).containsInAnyOrder("0:1,2", "1:3");
}
/**
* Motivation behind coGroupByKeyTest.
*
* <p>Target transform – CoGroupByKey
* (https://beam.apache.org/documentation/programming-guide/#cogroupbykey) Test scenario –
* Grouping multiple input collections with keys to a collection of KV<K, CoGbkResult> by key
* Boundary conditions checked – –> KeyedPCollectionTuple<?> to external transforms –>
* PCollection<KV<?, Iterable<?>>> from external transforms
*/
@Test
@Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
public void coGroupByKeyTest() {
PCollection<KV<Long, String>> col1 =
testPipeline.apply("createCol1", Create.of(KV.of(0L, "1"), KV.of(0L, "2"), KV.of(1L, "3")));
PCollection<KV<Long, String>> col2 =
testPipeline.apply("createCol2", Create.of(KV.of(0L, "4"), KV.of(1L, "5"), KV.of(1L, "6")));
PCollection<KV<Long, Iterable<String>>> cgbkCol =
KeyedPCollectionTuple.of("col1", col1)
.and("col2", col2)
.apply(External.of(TEST_CGBK_URN, new byte[] {}, expansionAddr));
PCollection<String> col =
cgbkCol.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<Long, Iterable<String>> kv) -> {
String[] values = Iterables.toArray(kv.getValue(), String.class);
Arrays.sort(values);
return String.format("%s:%s", kv.getKey(), String.join(",", values));
}));
PAssert.that(col).containsInAnyOrder("0:1,2,4", "1:3,5,6");
}
/**
* Motivation behind combineGloballyTest.
*
* <p>Target transform – Combine
* (https://beam.apache.org/documentation/programming-guide/#combine) Test scenario – Combining
* elements globally with a predefined simple CombineFn Boundary conditions checked – –>
* PCollection<?> to external transforms –> PCollection<?> from external transforms
*/
@Test
@Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
public void combineGloballyTest() {
PCollection<Long> col =
testPipeline
.apply(Create.of(1L, 2L, 3L))
.apply(External.of(TEST_COMGL_URN, new byte[] {}, expansionAddr));
PAssert.that(col).containsInAnyOrder(6L);
}
/**
* Motivation behind combinePerKeyTest.
*
* <p>Target transform – Combine
* (https://beam.apache.org/documentation/programming-guide/#combine) Test scenario – Combining
* elements per key with a predefined simple merging function Boundary conditions checked – –>
* PCollection<?> to external transforms –> PCollection<?> from external transforms
*/
@Test
@Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
public void combinePerKeyTest() {
PCollection<KV<String, Long>> col =
testPipeline
.apply(Create.of(KV.of("a", 1L), KV.of("a", 2L), KV.of("b", 3L)))
.apply(External.of(TEST_COMPK_URN, new byte[] {}, expansionAddr));
PAssert.that(col).containsInAnyOrder(KV.of("a", 3L), KV.of("b", 3L));
}
/**
* Motivation behind flattenTest.
*
* <p>Target transform – Flatten
* (https://beam.apache.org/documentation/programming-guide/#flatten) Test scenario – Merging
* multiple collections into a single collection Boundary conditions checked – –>
* PCollectionList<?> to external transforms –> PCollection<?> from external transforms
*/
@Test
@Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
public void flattenTest() {
PCollection<Long> col1 = testPipeline.apply("createCol1", Create.of(1L, 2L, 3L));
PCollection<Long> col2 = testPipeline.apply("createCol2", Create.of(4L, 5L, 6L));
PCollection<Long> col =
PCollectionList.of(col1)
.and(col2)
.apply(External.of(TEST_FLATTEN_URN, new byte[] {}, expansionAddr));
PAssert.that(col).containsInAnyOrder(1L, 2L, 3L, 4L, 5L, 6L);
}
/**
* Motivation behind partitionTest.
*
* <p>Target transform – Partition
* (https://beam.apache.org/documentation/programming-guide/#partition) Test scenario – Splitting
* a single collection into multiple collections with a predefined simple PartitionFn Boundary
* conditions checked – –> PCollection<?> to external transforms –> PCollectionList<?> from
* external transforms
*/
@Test
@Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
public void partitionTest() {
PCollectionTuple col =
testPipeline
.apply(Create.of(1L, 2L, 3L, 4L, 5L, 6L))
.apply(
External.of(TEST_PARTITION_URN, new byte[] {}, expansionAddr).withMultiOutputs());
PAssert.that(col.get("0")).containsInAnyOrder(2L, 4L, 6L);
PAssert.that(col.get("1")).containsInAnyOrder(1L, 3L, 5L);
}
private byte[] toStringPayloadBytes(String data) throws IOException {
Row configRow =
Row.withSchema(Schema.of(Field.of("data", FieldType.STRING)))
.withFieldValue("data", data)
.build();
ByteString.Output outputStream = ByteString.newOutput();
try {
RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
ExternalTransforms.ExternalConfigurationPayload payload =
ExternalTransforms.ExternalConfigurationPayload.newBuilder()
.setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), false))
.setPayload(outputStream.toByteString())
.build();
return payload.toByteArray();
}
private static byte[] encodeString(String str) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
StringUtf8Coder.of().encode(str, baos);
return baos.toByteArray();
}
}