Added more test for issue 134 (#224)

* Update README.md

Add notes regarding issue with JAVA_HOME variable and setting up Wayang as well as specifying (current) supported Java version.

* added partial test class for flink

* fixed flinktestbase

* added more test issue 134

* added license header

* added license header

Co-authored-by: Bertty Contreras-Rojas <bertty@databloom.ai>
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCartesianOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCartesianOperatorTest.java
index 73fd91f..6c3c5e6 100644
--- a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCartesianOperatorTest.java
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCartesianOperatorTest.java
@@ -28,6 +28,9 @@
 import java.util.Arrays;
 import java.util.List;
 
+/**
+ * Test suite for {@link FlinkCartesianOperator}.
+ */
 public class FlinkCartesianOperatorTest extends FlinkOperatorTestBase {
 
     @Test
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCollectionSourceTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCollectionSourceTest.java
new file mode 100644
index 0000000..a983ea5
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCollectionSourceTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Test suite for the {@link FlinkCollectionSource}.
+ */
+public class FlinkCollectionSourceTest extends FlinkOperatorTestBase{
+    @Test
+    public void testExecution() throws Exception {
+        Set<Integer> inputValues = new HashSet<>(Arrays.asList(1, 2, 3));
+        FlinkCollectionSource<Integer> collectionSource = new FlinkCollectionSource<Integer>(
+                inputValues,
+                DataSetType.createDefault(Integer.class));
+        DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+        // Set up the ChannelInstances.
+        final ChannelInstance[] inputs = new ChannelInstance[]{};
+        final ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+        // Execute.
+        this.evaluate(collectionSource, inputs, outputs);
+
+        final Set<Integer> outputValues = new HashSet<>(output.<Integer>provideDataSet().collect());
+        Assert.assertEquals(outputValues, inputValues);
+    }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCountOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCountOperatorTest.java
new file mode 100644
index 0000000..577b7d5
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkCountOperatorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.xml.crypto.Data;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Test suite for {@link FlinkCountOperator}.
+ */
+public class FlinkCountOperatorTest extends FlinkOperatorTestBase{
+    @Test
+    public void testExecution() throws Exception {
+        // Prepare test data.
+        DataSetChannel.Instance input = this.createDataSetChannelInstance(Arrays.asList(1, 2, 3, 4, 5));
+        DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+        // Build the count operator.
+        FlinkCountOperator<Integer> countOperator =
+                new FlinkCountOperator<Integer>(DataSetType.createDefaultUnchecked(Integer.class));
+
+        // Set up the ChannelInstances.
+        final ChannelInstance[] inputs = new ChannelInstance[]{input};
+        final ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+        // Execute.
+        this.evaluate(countOperator, inputs, outputs);
+
+        // Verify the outcome.
+        final List<Object> result = output.provideDataSet().collect();
+        Assert.assertEquals(1,result.size());
+        Assert.assertEquals(Long.valueOf(5),result.iterator().next());
+
+    }
+
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkDistinctOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkDistinctOperatorTest.java
new file mode 100644
index 0000000..6edb68e
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkDistinctOperatorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.xml.crypto.Data;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test suite for {@link FlinkDistinctOperator}.
+ */
+//The test assert error expected 4, actual 1
+public class FlinkDistinctOperatorTest extends FlinkOperatorTestBase{
+
+    @Test
+    public void testExecution() throws Exception {
+        // Prepare test data.
+        List<Integer> inputData = Arrays.asList(0, 1, 1, 6, 2, 2, 6, 6);
+
+        // Build the distinct operator.
+        FlinkDistinctOperator<Integer> distinctOperator =
+                new FlinkDistinctOperator<>(
+                        DataSetType.createDefaultUnchecked(Integer.class)
+                );
+
+        // Set up the ChannelInstances.
+        final ChannelInstance[] inputs = new ChannelInstance[]{this.createDataSetChannelInstance(inputData)};
+        final ChannelInstance[] outputs = new ChannelInstance[]{this.createDataSetChannelInstance()};
+
+        // Execute.
+        this.evaluate(distinctOperator, inputs, outputs);
+
+        // Verify the outcome.
+        final List<Integer> result = ((DataSetChannel.Instance) outputs[0]).<Integer>provideDataSet().collect();
+        for(Object e : result){
+            System.out.println(e);
+        }
+        Assert.assertEquals(4, result.size());
+        Assert.assertEquals(Arrays.asList(0, 1, 6, 2), result);
+
+    }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFilterOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFilterOperatorTest.java
new file mode 100644
index 0000000..65cbb62
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFilterOperatorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.function.PredicateDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+/**
+ * Test suite for {@link FlinkFilterOperator}.
+ */
+public class FlinkFilterOperatorTest extends FlinkOperatorTestBase{
+    @Test
+    public void testExecution() throws Exception {
+        // Prepare test data.
+        DataSetChannel.Instance input = this.createDataSetChannelInstance(Arrays.asList(0, 1, 1, 2, 6));
+        DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+        // Build the distinct operator.
+        FlinkFilterOperator<Integer> filterOperator =
+                new FlinkFilterOperator<>(
+                        DataSetType.createDefaultUnchecked(Integer.class),
+                        new PredicateDescriptor<>(item -> (item > 0), Integer.class)
+                );
+
+        // Set up the ChannelInstances.
+        ChannelInstance[] inputs = new ChannelInstance[]{input};
+        ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+        // Execute.
+        this.evaluate(filterOperator, inputs, outputs);
+
+        // Verify the outcome.
+        final List<Integer> result = output.<Integer>provideDataSet().collect();
+        Assert.assertEquals(4, result.size());
+        Assert.assertEquals(Arrays.asList(1, 1, 2, 6), result);
+
+    }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFlatMapOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFlatMapOperatorTest.java
new file mode 100644
index 0000000..7a7b53f
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkFlatMapOperatorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.function.FlatMapDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test suite for {@link FlinkFlatMapOperator}.
+ */
+public class FlinkFlatMapOperatorTest extends FlinkOperatorTestBase{
+    @Test
+    public void testExecution() throws Exception {
+        // Prepare test data.
+        DataSetChannel.Instance input = this.createDataSetChannelInstance(Arrays.asList("one phrase", "two sentences", "three lines"));
+        DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+        FlinkFlatMapOperator<String, String> flatMapOperator = new FlinkFlatMapOperator<>(
+                DataSetType.createDefaultUnchecked(String.class),
+                DataSetType.createDefaultUnchecked(String.class),
+                new FlatMapDescriptor<>(phrase -> Arrays.asList(phrase.split(" ")), String.class, String.class)
+        );
+
+        // Set up the ChannelInstances.
+        ChannelInstance[] inputs = new ChannelInstance[]{input};
+        ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+        // Execute.
+        this.evaluate(flatMapOperator, inputs, outputs);
+
+        // Verify the outcome.
+        final List<String> result = output.<String>provideDataSet().collect();
+        Assert.assertEquals(6, result.size());
+        Assert.assertEquals(Arrays.asList("one", "phrase", "two", "sentences", "three", "lines"), result);
+
+    }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkGlobalMaterializedGroupOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkGlobalMaterializedGroupOperatorTest.java
new file mode 100644
index 0000000..1bb8362
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkGlobalMaterializedGroupOperatorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.WayangCollections;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Test suite for {@link FlinkGlobalMaterializedGroupOperator}.
+ */
+public class FlinkGlobalMaterializedGroupOperatorTest extends FlinkOperatorTestBase{
+
+    @Test
+    public void testExecution() throws Exception {
+        // Prepare test data.
+        Collection<Integer> inputCollection = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+        // Build the reduce operator.
+        FlinkGlobalMaterializedGroupOperator<Integer> globalGroup =
+                new FlinkGlobalMaterializedGroupOperator<>(
+                        DataSetType.createDefaultUnchecked(Integer.class),
+                        DataSetType.createGroupedUnchecked(Iterable.class)
+                );
+
+        // Execute.
+        ChannelInstance[] inputs = new DataSetChannel.Instance[]{this.createDataSetChannelInstance(inputCollection)};
+        ChannelInstance[] outputs = new DataSetChannel.Instance[]{this.createDataSetChannelInstance()};
+        this.evaluate(globalGroup, inputs, outputs);
+
+        // Verify the outcome.
+        final Collection<Iterable<Integer>> result = ((DataSetChannel.Instance) outputs[0]).<Iterable<Integer>>provideDataSet().collect();
+        Assert.assertEquals(1, result.size());
+        Assert.assertEquals(inputCollection, result.iterator().next());
+
+    }
+
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkJoinOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkJoinOperatorTest.java
new file mode 100644
index 0000000..d4b0e4b
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkJoinOperatorTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.basic.function.ProjectionDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+//problematic
+/**
+ * Test suite for {@link FlinkJoinOperator}.
+ */
+public class FlinkJoinOperatorTest extends FlinkOperatorTestBase{
+    @Test
+    public void testExecution() throws Exception {
+        // Prepare test data.
+        DataSetChannel.Instance input0 = this.createDataSetChannelInstance(Arrays.asList(
+                new Tuple2<>(1, "b"), new Tuple2<>(1, "c"), new Tuple2<>(2, "d"), new Tuple2<>(3, "e")));
+        DataSetChannel.Instance input1 = this.createDataSetChannelInstance(Arrays.asList(
+                new Tuple2<>("x", 1), new Tuple2<>("y", 1), new Tuple2<>("z", 2), new Tuple2<>("w", 4)));
+        DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+        // Build the Cartesian operator.
+        FlinkJoinOperator<Tuple2, Tuple2, Integer> join =
+                new FlinkJoinOperator<>(
+                        DataSetType.createDefaultUnchecked(Tuple2.class),
+                        DataSetType.createDefaultUnchecked(Tuple2.class),
+                        new ProjectionDescriptor<>(
+                                DataUnitType.createBasicUnchecked(Tuple2.class),
+                                DataUnitType.createBasic(Integer.class),
+                                "field0"),
+                        new ProjectionDescriptor<>(
+                                DataUnitType.createBasicUnchecked(Tuple2.class),
+                                DataUnitType.createBasic(Integer.class),
+                                "field1"));
+
+        // Set up the ChannelInstances.
+        final ChannelInstance[] inputs = new ChannelInstance[]{input0, input1};
+        final ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+        // Execute.
+        this.evaluate(join, inputs, outputs);
+
+        // Verify the outcome.
+        final List<Tuple2<Tuple2<Integer, String>, Tuple2<String, Integer>>> result =
+                output.<Tuple2<Tuple2<Integer, String>, Tuple2<String, Integer>>>provideDataSet().collect();
+        Assert.assertEquals(5, result.size());
+        Assert.assertEquals(result.get(0), new Tuple2<>(new Tuple2<>(1, "b"), new Tuple2<>("x", 1)));
+        Assert.assertEquals(result.get(1), new Tuple2<>(new Tuple2<>(1, "b"), new Tuple2<>("y", 1)));
+        Assert.assertEquals(result.get(2), new Tuple2<>(new Tuple2<>(1, "c"), new Tuple2<>("x", 1)));
+        Assert.assertEquals(result.get(3), new Tuple2<>(new Tuple2<>(1, "c"), new Tuple2<>("y", 1)));
+        Assert.assertEquals(result.get(4), new Tuple2<>(new Tuple2<>(2, "d"), new Tuple2<>("z", 2)));
+
+
+    }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java
index e926b0a..6bf1a0e 100644
--- a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java
@@ -40,6 +40,9 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Test base for {@link FlinkExecutionOperator} tests.
+ */
 public class FlinkOperatorTestBase {
 
     protected Configuration configuration;
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkReduceByOperatorTest.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkReduceByOperatorTest.java
new file mode 100644
index 0000000..cf68d35
--- /dev/null
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/operators/FlinkReduceByOperatorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.basic.function.ProjectionDescriptor;
+import org.apache.wayang.core.function.ReduceDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Test suite for {@link FlinkReduceByOperator}.
+ */
+public class FlinkReduceByOperatorTest extends FlinkOperatorTestBase{
+    @Test
+    public void testExecution() throws Exception {
+        // Prepare test data.
+        List<Tuple2<String, Integer>> inputList = Arrays.stream("aaabbccccdeefff".split(""))
+                .map(string -> new Tuple2<>(string, 1))
+                .collect(Collectors.toList());
+        DataSetChannel.Instance input = this.createDataSetChannelInstance(inputList);
+        DataSetChannel.Instance output = this.createDataSetChannelInstance();
+
+
+        // Build the reduce operator.
+        FlinkReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator =
+                new FlinkReduceByOperator<>(
+                        DataSetType.createDefaultUnchecked(Tuple2.class),
+                        new ProjectionDescriptor<>(
+                                DataUnitType.createBasicUnchecked(Tuple2.class),
+                                DataUnitType.createBasic(String.class),
+                                "field0"),
+                        new ReduceDescriptor<>(
+                                (a, b) -> {
+                                    a.field1 += b.field1;
+                                    return a;
+                                }, DataUnitType.createGroupedUnchecked(Tuple2.class),
+                                DataUnitType.createBasicUnchecked(Tuple2.class)
+                        ));
+
+        // Set up the ChannelInstances.
+        final ChannelInstance[] inputs = new ChannelInstance[]{input};
+        final ChannelInstance[] outputs = new ChannelInstance[]{output};
+
+        // Execute.
+        this.evaluate(reduceByOperator, inputs, outputs);
+
+        // Verify the outcome.
+        final Iterable<Tuple2<String, Integer>> result = output.<Tuple2<String, Integer>>provideDataSet().collect();
+        final Set<Tuple2<String, Integer>> resultSet = new HashSet<>();
+        result.forEach(resultSet::add);
+        final Tuple2[] expectedResults = {
+                new Tuple2<>("a", 3),
+                new Tuple2<>("b", 2),
+                new Tuple2<>("c", 4),
+                new Tuple2<>("d", 1),
+                new Tuple2<>("e", 2),
+                new Tuple2<>("f", 3)
+        };
+        Arrays.stream(expectedResults)
+                .forEach(expected -> Assert.assertTrue("Not contained: " + expected, resultSet.contains(expected)));
+        Assert.assertEquals(expectedResults.length, resultSet.size());
+
+    }
+}
diff --git a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/test/ChannelFactory.java b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/test/ChannelFactory.java
index 5c6da87..2bb07de 100644
--- a/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/test/ChannelFactory.java
+++ b/wayang-platforms/wayang-flink/code/test/java/org/apache/wayang/flink/test/ChannelFactory.java
@@ -19,6 +19,7 @@
 package org.apache.wayang.flink.test;
 
 import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.plan.executionplan.Channel;
 import org.apache.wayang.core.platform.ChannelDescriptor;
 import org.apache.wayang.core.util.WayangCollections;
 import org.apache.wayang.flink.channels.DataSetChannel;
@@ -30,6 +31,9 @@
 
 import static org.mockito.Mockito.mock;
 
+/**
+ * Utility to create {@link Channel}s in tests.
+ */
 public class ChannelFactory {
 
     private static FlinkExecutor flinkExecutor;