Added more test for issue 134 (#224)
* Update README.md
Add notes regarding issue with JAVA_HOME variable and setting up Wayang as well as specifying (current) supported Java version.
* added partial test class for flink
* fixed flinktestbase
* added more test issue 134
* added license header
* added license header
Co-authored-by: Bertty Contreras-Rojas <bertty@databloom.ai>
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCartesianOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCartesianOperatorTest.java
index 73fd91f..6c3c5e6 100644
--- a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCartesianOperatorTest.java
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCartesianOperatorTest.java
@@ -28,6 +28,9 @@
import java.util.Arrays;
import java.util.List;
+/**
+ * Test suite for {@link FlinkCartesianOperator}.
+ */
public class FlinkCartesianOperatorTest extends FlinkOperatorTestBase {
@Test
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCollectionSourceTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCollectionSourceTest.java
new file mode 100644
index 0000000..a983ea5
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCollectionSourceTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Test suite for the {@link FlinkCollectionSource}.
+ */
+public class FlinkCollectionSourceTest extends FlinkOperatorTestBase{
+ @Test
+ public void testExecution() throws Exception {
+ Set<Integer> inputValues = new HashSet<>(Arrays.asList(1, 2, 3));
+ FlinkCollectionSource<Integer> collectionSource = new FlinkCollectionSource<Integer>(
+ inputValues,
+ DataSetType.createDefault(Integer.class));
+ DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] inputs = new ChannelInstance[]{};
+ final ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+ // Execute.
+ this.evaluate(collectionSource, inputs, outputs);
+
+ final Set<Integer> outputValues = new HashSet<>(output.<Integer>provideDataSet().collect());
+ Assert.assertEquals(outputValues, inputValues);
+ }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCountOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCountOperatorTest.java
new file mode 100644
index 0000000..577b7d5
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCountOperatorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.xml.crypto.Data;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Test suite for {@link FlinkCountOperator}.
+ */
+public class FlinkCountOperatorTest extends FlinkOperatorTestBase{
+ @Test
+ public void testExecution() throws Exception {
+ // Prepare test data.
+ DataSetChannel.Instance input = this.createDataSetChannelInstance(Arrays.asList(1, 2, 3, 4, 5));
+ DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+ // Build the count operator.
+ FlinkCountOperator<Integer> countOperator =
+ new FlinkCountOperator<Integer>(DataSetType.createDefaultUnchecked(Integer.class));
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] inputs = new ChannelInstance[]{input};
+ final ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+ // Execute.
+ this.evaluate(countOperator, inputs, outputs);
+
+ // Verify the outcome.
+ final List<Object> result = output.provideDataSet().collect();
+ Assert.assertEquals(1,result.size());
+ Assert.assertEquals(Long.valueOf(5),result.iterator().next());
+
+ }
+
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkDistinctOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkDistinctOperatorTest.java
new file mode 100644
index 0000000..6edb68e
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkDistinctOperatorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.xml.crypto.Data;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test suite for {@link FlinkDistinctOperator}.
+ */
+//The test assert error expected 4, actual 1
+public class FlinkDistinctOperatorTest extends FlinkOperatorTestBase{
+
+ @Test
+ public void testExecution() throws Exception {
+ // Prepare test data.
+ List<Integer> inputData = Arrays.asList(0, 1, 1, 6, 2, 2, 6, 6);
+
+ // Build the distinct operator.
+ FlinkDistinctOperator<Integer> distinctOperator =
+ new FlinkDistinctOperator<>(
+ DataSetType.createDefaultUnchecked(Integer.class)
+ );
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] inputs = new ChannelInstance[]{this.createDataSetChannelInstance(inputData)};
+ final ChannelInstance[] outputs = new ChannelInstance[]{this.createDataSetChannelInstance()};
+
+ // Execute.
+ this.evaluate(distinctOperator, inputs, outputs);
+
+ // Verify the outcome.
+ final List<Integer> result = ((DataSetChannel.Instance) outputs[0]).<Integer>provideDataSet().collect();
+ for(Object e : result){
+ System.out.println(e);
+ }
+ Assert.assertEquals(4, result.size());
+ Assert.assertEquals(Arrays.asList(0, 1, 6, 2), result);
+
+ }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFilterOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFilterOperatorTest.java
new file mode 100644
index 0000000..65cbb62
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFilterOperatorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.function.PredicateDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+/**
+ * Test suite for {@link FlinkFilterOperator}.
+ */
+public class FlinkFilterOperatorTest extends FlinkOperatorTestBase{
+ @Test
+ public void testExecution() throws Exception {
+ // Prepare test data.
+ DataSetChannel.Instance input = this.createDataSetChannelInstance(Arrays.asList(0, 1, 1, 2, 6));
+ DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+ // Build the distinct operator.
+ FlinkFilterOperator<Integer> filterOperator =
+ new FlinkFilterOperator<>(
+ DataSetType.createDefaultUnchecked(Integer.class),
+ new PredicateDescriptor<>(item -> (item > 0), Integer.class)
+ );
+
+ // Set up the ChannelInstances.
+ ChannelInstance[] inputs = new ChannelInstance[]{input};
+ ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+ // Execute.
+ this.evaluate(filterOperator, inputs, outputs);
+
+ // Verify the outcome.
+ final List<Integer> result = output.<Integer>provideDataSet().collect();
+ Assert.assertEquals(4, result.size());
+ Assert.assertEquals(Arrays.asList(1, 1, 2, 6), result);
+
+ }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFlatMapOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFlatMapOperatorTest.java
new file mode 100644
index 0000000..7a7b53f
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFlatMapOperatorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.function.FlatMapDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test suite for {@link FlinkFlatMapOperator}.
+ */
+public class FlinkFlatMapOperatorTest extends FlinkOperatorTestBase{
+ @Test
+ public void testExecution() throws Exception {
+ // Prepare test data.
+ DataSetChannel.Instance input = this.createDataSetChannelInstance(Arrays.asList("one phrase", "two sentences", "three lines"));
+ DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+ FlinkFlatMapOperator<String, String> flatMapOperator = new FlinkFlatMapOperator<>(
+ DataSetType.createDefaultUnchecked(String.class),
+ DataSetType.createDefaultUnchecked(String.class),
+ new FlatMapDescriptor<>(phrase -> Arrays.asList(phrase.split(" ")), String.class, String.class)
+ );
+
+ // Set up the ChannelInstances.
+ ChannelInstance[] inputs = new ChannelInstance[]{input};
+ ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+ // Execute.
+ this.evaluate(flatMapOperator, inputs, outputs);
+
+ // Verify the outcome.
+ final List<String> result = output.<String>provideDataSet().collect();
+ Assert.assertEquals(6, result.size());
+ Assert.assertEquals(Arrays.asList("one", "phrase", "two", "sentences", "three", "lines"), result);
+
+ }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkGlobalMaterializedGroupOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkGlobalMaterializedGroupOperatorTest.java
new file mode 100644
index 0000000..1bb8362
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkGlobalMaterializedGroupOperatorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.WayangCollections;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Test suite for {@link FlinkGlobalMaterializedGroupOperator}.
+ */
+public class FlinkGlobalMaterializedGroupOperatorTest extends FlinkOperatorTestBase{
+
+ @Test
+ public void testExecution() throws Exception {
+ // Prepare test data.
+ Collection<Integer> inputCollection = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+ // Build the reduce operator.
+ FlinkGlobalMaterializedGroupOperator<Integer> globalGroup =
+ new FlinkGlobalMaterializedGroupOperator<>(
+ DataSetType.createDefaultUnchecked(Integer.class),
+ DataSetType.createGroupedUnchecked(Iterable.class)
+ );
+
+ // Execute.
+ ChannelInstance[] inputs = new DataSetChannel.Instance[]{this.createDataSetChannelInstance(inputCollection)};
+ ChannelInstance[] outputs = new DataSetChannel.Instance[]{this.createDataSetChannelInstance()};
+ this.evaluate(globalGroup, inputs, outputs);
+
+ // Verify the outcome.
+ final Collection<Iterable<Integer>> result = ((DataSetChannel.Instance) outputs[0]).<Iterable<Integer>>provideDataSet().collect();
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals(inputCollection, result.iterator().next());
+
+ }
+
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkJoinOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkJoinOperatorTest.java
new file mode 100644
index 0000000..d4b0e4b
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkJoinOperatorTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.basic.function.ProjectionDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+//problematic
+/**
+ * Test suite for {@link FlinkJoinOperator}.
+ */
+public class FlinkJoinOperatorTest extends FlinkOperatorTestBase{
+ @Test
+ public void testExecution() throws Exception {
+ // Prepare test data.
+ DataSetChannel.Instance input0 = this.createDataSetChannelInstance(Arrays.asList(
+ new Tuple2<>(1, "b"), new Tuple2<>(1, "c"), new Tuple2<>(2, "d"), new Tuple2<>(3, "e")));
+ DataSetChannel.Instance input1 = this.createDataSetChannelInstance(Arrays.asList(
+ new Tuple2<>("x", 1), new Tuple2<>("y", 1), new Tuple2<>("z", 2), new Tuple2<>("w", 4)));
+ DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+ // Build the Cartesian operator.
+ FlinkJoinOperator<Tuple2, Tuple2, Integer> join =
+ new FlinkJoinOperator<>(
+ DataSetType.createDefaultUnchecked(Tuple2.class),
+ DataSetType.createDefaultUnchecked(Tuple2.class),
+ new ProjectionDescriptor<>(
+ DataUnitType.createBasicUnchecked(Tuple2.class),
+ DataUnitType.createBasic(Integer.class),
+ "field0"),
+ new ProjectionDescriptor<>(
+ DataUnitType.createBasicUnchecked(Tuple2.class),
+ DataUnitType.createBasic(Integer.class),
+ "field1"));
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] inputs = new ChannelInstance[]{input0, input1};
+ final ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+ // Execute.
+ this.evaluate(join, inputs, outputs);
+
+ // Verify the outcome.
+ final List<Tuple2<Tuple2<Integer, String>, Tuple2<String, Integer>>> result =
+ output.<Tuple2<Tuple2<Integer, String>, Tuple2<String, Integer>>>provideDataSet().collect();
+ Assert.assertEquals(5, result.size());
+ Assert.assertEquals(result.get(0), new Tuple2<>(new Tuple2<>(1, "b"), new Tuple2<>("x", 1)));
+ Assert.assertEquals(result.get(1), new Tuple2<>(new Tuple2<>(1, "b"), new Tuple2<>("y", 1)));
+ Assert.assertEquals(result.get(2), new Tuple2<>(new Tuple2<>(1, "c"), new Tuple2<>("x", 1)));
+ Assert.assertEquals(result.get(3), new Tuple2<>(new Tuple2<>(1, "c"), new Tuple2<>("y", 1)));
+ Assert.assertEquals(result.get(4), new Tuple2<>(new Tuple2<>(2, "d"), new Tuple2<>("z", 2)));
+
+
+ }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java
index e926b0a..6bf1a0e 100644
--- a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java
@@ -40,6 +40,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+/**
+ * Test base for {@link FlinkExecutionOperator} tests.
+ */
public class FlinkOperatorTestBase {
protected Configuration configuration;
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkReduceByOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkReduceByOperatorTest.java
new file mode 100644
index 0000000..cf68d35
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkReduceByOperatorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.basic.function.ProjectionDescriptor;
+import org.apache.wayang.core.function.ReduceDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Test suite for {@link FlinkReduceByOperator}.
+ */
+public class FlinkReduceByOperatorTest extends FlinkOperatorTestBase{
+ @Test
+ public void testExecution() throws Exception {
+ // Prepare test data.
+ List<Tuple2<String, Integer>> inputList = Arrays.stream("aaabbccccdeefff".split(""))
+ .map(string -> new Tuple2<>(string, 1))
+ .collect(Collectors.toList());
+ DataSetChannel.Instance input = this.createDataSetChannelInstance(inputList);
+ DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+
+ // Build the reduce operator.
+ FlinkReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator =
+ new FlinkReduceByOperator<>(
+ DataSetType.createDefaultUnchecked(Tuple2.class),
+ new ProjectionDescriptor<>(
+ DataUnitType.createBasicUnchecked(Tuple2.class),
+ DataUnitType.createBasic(String.class),
+ "field0"),
+ new ReduceDescriptor<>(
+ (a, b) -> {
+ a.field1 += b.field1;
+ return a;
+ }, DataUnitType.createGroupedUnchecked(Tuple2.class),
+ DataUnitType.createBasicUnchecked(Tuple2.class)
+ ));
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] inputs = new ChannelInstance[]{input};
+ final ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+ // Execute.
+ this.evaluate(reduceByOperator, inputs, outputs);
+
+ // Verify the outcome.
+ final Iterable<Tuple2<String, Integer>> result = output.<Tuple2<String, Integer>>provideDataSet().collect();
+ final Set<Tuple2<String, Integer>> resultSet = new HashSet<>();
+ result.forEach(resultSet::add);
+ final Tuple2[] expectedResults = {
+ new Tuple2<>("a", 3),
+ new Tuple2<>("b", 2),
+ new Tuple2<>("c", 4),
+ new Tuple2<>("d", 1),
+ new Tuple2<>("e", 2),
+ new Tuple2<>("f", 3)
+ };
+ Arrays.stream(expectedResults)
+ .forEach(expected -> Assert.assertTrue("Not contained: " + expected, resultSet.contains(expected)));
+ Assert.assertEquals(expectedResults.length, resultSet.size());
+
+ }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/test/ChannelFactory.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/test/ChannelFactory.java
index 5c6da87..2bb07de 100644
--- a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/test/ChannelFactory.java
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/test/ChannelFactory.java
@@ -19,6 +19,7 @@
package org.apache.wayang.flink.test;
import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.flink.channels.DataSetChannel;
@@ -30,6 +31,9 @@
import static org.mockito.Mockito.mock;
+/**
+ * Utility to create {@link Channel}s in tests.
+ */
public class ChannelFactory {
private static FlinkExecutor flinkExecutor;