blob: 0e5c72b67a5856e69ecd2bcbd64278bb0ea7c54c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.operators.util.CollectionDataSets.CrazyNested;
import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.operators.util.CollectionDataSets.FromTupleWithCTor;
import org.apache.flink.test.operators.util.CollectionDataSets.POJO;
import org.apache.flink.test.operators.util.CollectionDataSets.PojoContainingTupleAndWritable;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import scala.math.BigInt;
/**
* Integration tests for {@link GroupReduceFunction}, {@link RichGroupReduceFunction}, and {@link
* GroupCombineFunction}.
*/
@SuppressWarnings({"serial", "unchecked", "UnusedDeclaration"})
@RunWith(Parameterized.class)
public class GroupReduceITCase extends MultipleProgramsTestBase {
public GroupReduceITCase(TestExecutionMode mode) {
super(mode);
}
@Test
public void
testCorrectnessofGroupReduceOnTupleContainingPrimitiveByteArrayWithKeyFieldSelectors()
throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<byte[], Integer>> ds = CollectionDataSets.getTuple2WithByteArrayDataSet(env);
DataSet<Integer> reduceDs = ds.groupBy(0).reduceGroup(new ByteArrayGroupReduce());
List<Integer> result = reduceDs.collect();
String expected = "0\n" + "1\n" + "2\n" + "3\n" + "4\n";
compareResultAsText(result, expected);
}
private static class ByteArrayGroupReduce
implements GroupReduceFunction<Tuple2<byte[], Integer>, Integer> {
@Override
public void reduce(Iterable<Tuple2<byte[], Integer>> values, Collector<Integer> out)
throws Exception {
int sum = 0;
for (Tuple2<byte[], Integer> value : values) {
sum += value.f1;
}
out.collect(sum);
}
}
@Test
public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector() throws Exception {
/*
* check correctness of groupReduce on tuples with key field selector
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Integer, Long>> reduceDs =
ds.groupBy(1).reduceGroup(new Tuple3GroupReduce());
List<Tuple2<Integer, Long>> result = reduceDs.collect();
String expected = "1,1\n" + "5,2\n" + "15,3\n" + "34,4\n" + "65,5\n" + "111,6\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelectors()
throws Exception {
/*
* check correctness of groupReduce on tuples with multiple key field selector
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds =
CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs =
ds.groupBy(4, 0).reduceGroup(new Tuple5GroupReduce());
List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs.collect();
String expected =
"1,1,0,P-),1\n"
+ "2,3,0,P-),1\n"
+ "2,2,0,P-),2\n"
+ "3,9,0,P-),2\n"
+ "3,6,0,P-),3\n"
+ "4,17,0,P-),1\n"
+ "4,17,0,P-),2\n"
+ "5,11,0,P-),1\n"
+ "5,29,0,P-),2\n"
+ "5,25,0,P-),3\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting()
throws Exception {
/*
* check correctness of groupReduce on tuples with key field selector and group sorting
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs =
ds.groupBy(1)
.sortGroup(2, Order.ASCENDING)
.reduceGroup(new Tuple3SortedGroupReduce());
List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
String expected =
"1,1,Hi\n"
+ "5,2,Hello-Hello world\n"
+ "15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n"
+ "34,4,Comment#1-Comment#2-Comment#3-Comment#4\n"
+ "65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n"
+ "111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor() throws Exception {
/*
* check correctness of groupReduce on tuples with key extractor
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Integer, Long>> reduceDs =
ds.groupBy(new KeySelector1()).reduceGroup(new Tuple3GroupReduce());
List<Tuple2<Integer, Long>> result = reduceDs.collect();
String expected = "1,1\n" + "5,2\n" + "15,3\n" + "34,4\n" + "65,5\n" + "111,6\n";
compareResultAsTuples(result, expected);
}
private static class KeySelector1 implements KeySelector<Tuple3<Integer, Long, String>, Long> {
private static final long serialVersionUID = 1L;
@Override
public Long getKey(Tuple3<Integer, Long, String> in) {
return in.f1;
}
}
@Test
public void testCorrectnessOfGroupReduceOnCustomTypeWithTypeExtractor() throws Exception {
/*
* check correctness of groupReduce on custom type with type extractor
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
DataSet<CustomType> reduceDs =
ds.groupBy(new KeySelector2()).reduceGroup(new CustomTypeGroupReduce());
List<CustomType> result = reduceDs.collect();
String expected =
"1,0,Hello!\n"
+ "2,3,Hello!\n"
+ "3,12,Hello!\n"
+ "4,30,Hello!\n"
+ "5,60,Hello!\n"
+ "6,105,Hello!\n";
compareResultAsText(result, expected);
}
private static class KeySelector2 implements KeySelector<CustomType, Integer> {
private static final long serialVersionUID = 1L;
@Override
public Integer getKey(CustomType in) {
return in.myInt;
}
}
@Test
public void testCorrectnessOfAllGroupReduceForTuples() throws Exception {
/*
* check correctness of all-groupreduce for tuples
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs =
ds.reduceGroup(new AllAddingTuple3GroupReduce());
List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
String expected = "231,91,Hello World\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCorrectnessOfAllGroupReduceForCustomTypes() throws Exception {
/*
* check correctness of all-groupreduce for custom types
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
DataSet<CustomType> reduceDs = ds.reduceGroup(new AllAddingCustomTypeGroupReduce());
List<CustomType> result = reduceDs.collect();
String expected = "91,210,Hello!";
compareResultAsText(result, expected);
}
@Test
public void testCorrectnessOfGroupReduceWithBroadcastSet() throws Exception {
/*
* check correctness of groupReduce with broadcast set
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs =
ds.groupBy(1)
.reduceGroup(new BCTuple3GroupReduce())
.withBroadcastSet(intDs, "ints");
List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
String expected =
"1,1,55\n" + "5,2,55\n" + "15,3,55\n" + "34,4,55\n" + "65,5,55\n" + "111,6,55\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCorrectnessOfGroupReduceIfUDFReturnsInputObjectsMultipleTimesWhileChangingThem()
throws Exception {
/*
* check correctness of groupReduce if UDF returns input objects multiple times and changes it in between
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs =
ds.groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce());
List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
String expected =
"11,1,Hi!\n"
+ "21,1,Hi again!\n"
+ "12,2,Hi!\n"
+ "22,2,Hi again!\n"
+ "13,2,Hi!\n"
+ "23,2,Hi again!\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine()
throws Exception {
/*
* check correctness of groupReduce on custom type with key extractor and combine
*/
org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
DataSet<CustomType> reduceDs =
ds.groupBy(new KeySelector3()).reduceGroup(new CustomTypeGroupReduceWithCombine());
List<CustomType> result = reduceDs.collect();
String expected =
"1,0,test1\n"
+ "2,3,test2\n"
+ "3,12,test3\n"
+ "4,30,test4\n"
+ "5,60,test5\n"
+ "6,105,test6\n";
compareResultAsText(result, expected);
}
private static class KeySelector3 implements KeySelector<CustomType, Integer> {
private static final long serialVersionUID = 1L;
@Override
public Integer getKey(CustomType in) {
return in.myInt;
}
}
@Test
public void testCorrectnessOfGroupReduceOnTuplesWithCombine() throws Exception {
/*
* check correctness of groupReduce on tuples with combine
*/
org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); // important because it determines how often the combiner is called
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Integer, String>> reduceDs =
ds.groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine());
List<Tuple2<Integer, String>> result = reduceDs.collect();
String expected =
"1,test1\n"
+ "5,test2\n"
+ "15,test3\n"
+ "34,test4\n"
+ "65,test5\n"
+ "111,test6\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCorrectnessOfAllGroupReduceForTuplesWithCombine() throws Exception {
/*
* check correctness of all-groupreduce for tuples with combine
*/
org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds =
CollectionDataSets.get3TupleDataSet(env)
.map(new IdentityMapper<Tuple3<Integer, Long, String>>())
.setParallelism(4);
Configuration cfg = new Configuration();
cfg.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION);
DataSet<Tuple2<Integer, String>> reduceDs =
ds.reduceGroup(new Tuple3AllGroupReduceWithCombine()).withParameters(cfg);
List<Tuple2<Integer, String>> result = reduceDs.collect();
String expected =
"322,"
+ "testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Exception {
/*
* check correctness of groupReduce with descending group sort
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs =
ds.groupBy(1)
.sortGroup(2, Order.DESCENDING)
.reduceGroup(new Tuple3SortedGroupReduce());
List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
String expected =
"1,1,Hi\n"
+ "5,2,Hello world-Hello\n"
+ "15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n"
+ "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n"
+ "65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n"
+ "111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector()
throws Exception {
/*
* check correctness of groupReduce on tuples with tuple-returning key selector
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds =
CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs =
ds.groupBy(new KeySelector4()).reduceGroup(new Tuple5GroupReduce());
List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs.collect();
String expected =
"1,1,0,P-),1\n"
+ "2,3,0,P-),1\n"
+ "2,2,0,P-),2\n"
+ "3,9,0,P-),2\n"
+ "3,6,0,P-),3\n"
+ "4,17,0,P-),1\n"
+ "4,17,0,P-),2\n"
+ "5,11,0,P-),1\n"
+ "5,29,0,P-),2\n"
+ "5,25,0,P-),3\n";
compareResultAsTuples(result, expected);
}
private static class KeySelector4
implements KeySelector<
Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
return new Tuple2<>(t.f0, t.f4);
}
}
@Test
public void testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting()
throws Exception {
/*
* check that input of combiner is also sorted for combinable groupReduce with group sorting
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs =
ds.groupBy(1)
.sortGroup(0, Order.ASCENDING)
.reduceGroup(new OrderCheckingCombinableReduce());
List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
String expected =
"1,1,Hi\n"
+ "2,2,Hello\n"
+ "4,3,Hello world, how are you?\n"
+ "7,4,Comment#1\n"
+ "11,5,Comment#5\n"
+ "16,6,Comment#10\n";
compareResultAsTuples(result, expected);
}
@Test
public void testDeepNesting() throws Exception {
/*
* Deep nesting test
* + null value in pojo
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CrazyNested> ds = CollectionDataSets.getCrazyNestedDataSet(env);
DataSet<Tuple2<String, Integer>> reduceDs =
ds.groupBy("nestLvl1.nestLvl2.nestLvl3.nestLvl4.f1nal")
.reduceGroup(new GroupReducer1());
List<Tuple2<String, Integer>> result = reduceDs.collect();
String expected = "aa,1\nbb,2\ncc,3\n";
compareResultAsTuples(result, expected);
}
private static class GroupReducer1
implements GroupReduceFunction<
CollectionDataSets.CrazyNested, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<CrazyNested> values, Collector<Tuple2<String, Integer>> out)
throws Exception {
int c = 0;
String n = null;
for (CrazyNested v : values) {
c++; // haha
n = v.nestLvl1.nestLvl2.nestLvl3.nestLvl4.f1nal;
}
out.collect(new Tuple2<>(n, c));
}
}
@Test
public void testPojoExtendingFromTupleWithCustomFields() throws Exception {
/*
* Test Pojo extending from tuple WITH custom fields
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<FromTupleWithCTor> ds = CollectionDataSets.getPojoExtendingFromTuple(env);
DataSet<Integer> reduceDs = ds.groupBy("special", "f2").reduceGroup(new GroupReducer2());
List<Integer> result = reduceDs.collect();
String expected = "3\n2\n";
compareResultAsText(result, expected);
}
private static class GroupReducer2 implements GroupReduceFunction<FromTupleWithCTor, Integer> {
@Override
public void reduce(Iterable<FromTupleWithCTor> values, Collector<Integer> out) {
out.collect(countElements(values));
}
}
@Test
public void testPojoContainigWritableAndTuples() throws Exception {
/*
* Test Pojo containing a Writable and Tuples
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<PojoContainingTupleAndWritable> ds =
CollectionDataSets.getPojoContainingTupleAndWritable(env);
DataSet<Integer> reduceDs =
ds.groupBy("hadoopFan", "theTuple.*") // full tuple selection
.reduceGroup(new GroupReducer3());
List<Integer> result = reduceDs.collect();
String expected = "1\n5\n";
compareResultAsText(result, expected);
}
private static class GroupReducer3
implements GroupReduceFunction<PojoContainingTupleAndWritable, Integer> {
@Override
public void reduce(
Iterable<PojoContainingTupleAndWritable> values, Collector<Integer> out) {
out.collect(countElements(values));
}
}
@Test
public void testTupleContainingPojosAndRegularFields() throws Exception {
/*
* Test Tuple containing pojos and regular fields
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, CrazyNested, POJO>> ds =
CollectionDataSets.getTupleContainingPojos(env);
DataSet<Integer> reduceDs =
ds.groupBy("f0", "f1.*") // nested full tuple selection
.reduceGroup(new GroupReducer4());
List<Integer> result = reduceDs.collect();
String expected = "3\n1\n";
compareResultAsText(result, expected);
}
private static class GroupReducer4
implements GroupReduceFunction<Tuple3<Integer, CrazyNested, POJO>, Integer> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(
Iterable<Tuple3<Integer, CrazyNested, POJO>> values, Collector<Integer> out) {
out.collect(countElements(values));
}
}
@Test
public void testStringBasedDefinitionOnGroupSort() throws Exception {
/*
* Test string-based definition on group sort, based on test:
* check correctness of groupReduce with descending group sort
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs =
ds.groupBy(1)
.sortGroup("f2", Order.DESCENDING)
.reduceGroup(new Tuple3SortedGroupReduce());
List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
String expected =
"1,1,Hi\n"
+ "5,2,Hello world-Hello\n"
+ "15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n"
+ "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n"
+ "65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n"
+ "111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
compareResultAsTuples(result, expected);
}
@Test
public void testIntBasedDefinitionOnGroupSortForFullNestedTuple() throws Exception {
/*
* Test int-based definition on group sort, for (full) nested Tuple
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds =
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
DataSet<String> reduceDs =
ds.groupBy("f1")
.sortGroup(0, Order.DESCENDING)
.reduceGroup(new NestedTupleReducer());
List<String> result = reduceDs.collect();
String expected = "a--(2,1)-(1,3)-(1,2)-\n" + "b--(2,2)-\n" + "c--(4,9)-(3,6)-(3,3)-\n";
compareResultAsText(result, expected);
}
@Test
public void testIntBasedDefinitionOnGroupSortForPartialNestedTuple() throws Exception {
/*
* Test int-based definition on group sort, for (partial) nested Tuple ASC
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds =
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
DataSet<String> reduceDs =
ds.groupBy("f1")
.sortGroup("f0.f0", Order.ASCENDING)
.sortGroup("f0.f1", Order.ASCENDING)
.reduceGroup(new NestedTupleReducer());
List<String> result = reduceDs.collect();
String expected = "a--(1,2)-(1,3)-(2,1)-\n" + "b--(2,2)-\n" + "c--(3,3)-(3,6)-(4,9)-\n";
compareResultAsText(result, expected);
}
@Test
public void testStringBasedDefinitionOnGroupSortForPartialNestedTuple() throws Exception {
/*
* Test string-based definition on group sort, for (partial) nested Tuple DESC
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds =
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
DataSet<String> reduceDs =
ds.groupBy("f1")
.sortGroup("f0.f0", Order.DESCENDING)
.reduceGroup(new NestedTupleReducer());
List<String> result = reduceDs.collect();
String expected = "a--(2,1)-(1,3)-(1,2)-\n" + "b--(2,2)-\n" + "c--(4,9)-(3,3)-(3,6)-\n";
compareResultAsText(result, expected);
}
@Test
public void testStringBasedDefinitionOnGroupSortForTwoGroupingKeys() throws Exception {
/*
* Test string-based definition on group sort, for two grouping keys
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds =
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
DataSet<String> reduceDs =
ds.groupBy("f1")
.sortGroup("f0.f0", Order.DESCENDING)
.sortGroup("f0.f1", Order.DESCENDING)
.reduceGroup(new NestedTupleReducer());
List<String> result = reduceDs.collect();
String expected = "a--(2,1)-(1,3)-(1,2)-\n" + "b--(2,2)-\n" + "c--(4,9)-(3,6)-(3,3)-\n";
compareResultAsText(result, expected);
}
@Test
public void testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos() throws Exception {
/*
* Test string-based definition on group sort, for two grouping keys with Pojos
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<PojoContainingTupleAndWritable> ds =
CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
// f0.f0 is first integer
DataSet<String> reduceDs =
ds.groupBy("hadoopFan")
.sortGroup("theTuple.f0", Order.DESCENDING)
.sortGroup("theTuple.f1", Order.DESCENDING)
.reduceGroup(new GroupReducer5());
List<String> result = reduceDs.collect();
String expected = "1---(10,100)-\n" + "2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
compareResultAsText(result, expected);
}
@Test
public void testTupleKeySelectorGroupSort() throws Exception {
/*
* check correctness of sorted groupReduce on tuples with keyselector sorting
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs =
ds.groupBy(new LongFieldExtractor<Tuple3<Integer, Long, String>>(1))
.sortGroup(
new StringFieldExtractor<Tuple3<Integer, Long, String>>(2),
Order.DESCENDING)
.reduceGroup(new Tuple3SortedGroupReduce());
List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
String expected =
"1,1,Hi\n"
+ "5,2,Hello world-Hello\n"
+ "15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n"
+ "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n"
+ "65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n"
+ "111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
compareResultAsTuples(result, expected);
}
private static class TwoTuplePojoExtractor
implements KeySelector<CustomType, Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Integer, Integer> getKey(CustomType value) throws Exception {
return new Tuple2<>(value.myInt, value.myInt);
}
}
private static class StringPojoExtractor implements KeySelector<CustomType, String> {
private static final long serialVersionUID = 1L;
@Override
public String getKey(CustomType value) throws Exception {
return value.myString;
}
}
@Test
public void testPojoKeySelectorGroupSort() throws Exception {
/*
* check correctness of sorted groupReduce on custom type with keyselector sorting
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
DataSet<CustomType> reduceDs =
ds.groupBy(new TwoTuplePojoExtractor())
.sortGroup(new StringPojoExtractor(), Order.DESCENDING)
.reduceGroup(new CustomTypeSortedGroupReduce());
List<CustomType> result = reduceDs.collect();
String expected =
"1,0,Hi\n"
+ "2,3,Hello world-Hello\n"
+ "3,12,Luke Skywalker-I am fine.-Hello world, how are you?\n"
+ "4,30,Comment#4-Comment#3-Comment#2-Comment#1\n"
+ "5,60,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n"
+ "6,105,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
compareResultAsText(result, expected);
}
private static class LongFieldExtractor<T extends Tuple> implements KeySelector<T, Long> {
private static final long serialVersionUID = 1L;
private int field;
public LongFieldExtractor() {}
public LongFieldExtractor(int field) {
this.field = field;
}
@Override
public Long getKey(T t) throws Exception {
return ((Tuple) t).getField(field);
}
}
private static class IntFieldExtractor<T extends Tuple> implements KeySelector<T, Integer> {
private static final long serialVersionUID = 1L;
private int field;
public IntFieldExtractor() {}
public IntFieldExtractor(int field) {
this.field = field;
}
@Override
public Integer getKey(T t) throws Exception {
return ((Tuple) t).getField(field);
}
}
private static class StringFieldExtractor<T extends Tuple> implements KeySelector<T, String> {
private static final long serialVersionUID = 1L;
private int field;
public StringFieldExtractor() {}
public StringFieldExtractor(int field) {
this.field = field;
}
@Override
public String getKey(T t) throws Exception {
return t.getField(field);
}
}
@Test
public void testTupleKeySelectorSortWithCombine() throws Exception {
/*
* check correctness of sorted groupReduce with combine on tuples with keyselector sorting
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Integer, String>> reduceDs =
ds.groupBy(new LongFieldExtractor<Tuple3<Integer, Long, String>>(1))
.sortGroup(
new StringFieldExtractor<Tuple3<Integer, Long, String>>(2),
Order.DESCENDING)
.reduceGroup(new Tuple3SortedGroupReduceWithCombine());
List<Tuple2<Integer, String>> result = reduceDs.collect();
if (super.mode != TestExecutionMode.COLLECTION) {
String expected =
"1,Hi\n"
+ "5,Hello world-Hello\n"
+ "15,Luke Skywalker-I am fine.-Hello world, how are you?\n"
+ "34,Comment#4-Comment#3-Comment#2-Comment#1\n"
+ "65,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n"
+ "111,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
compareResultAsTuples(result, expected);
}
}
private static class FiveToTwoTupleExtractor
implements KeySelector<
Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Long, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, Integer, String, Long> in) {
return new Tuple2<>(in.f4, in.f2);
}
}
@Test
public void testTupleKeySelectorSortCombineOnTuple() throws Exception {
/*
* check correctness of sorted groupReduceon with Tuple2 keyselector sorting
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds =
CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs =
ds.groupBy(new IntFieldExtractor<Tuple5<Integer, Long, Integer, String, Long>>(0))
.sortGroup(new FiveToTwoTupleExtractor(), Order.DESCENDING)
.reduceGroup(new Tuple5SortedGroupReduce());
List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs.collect();
String expected =
"1,1,0,Hallo,1\n"
+ "2,5,0,Hallo Welt-Hallo Welt wie,1\n"
+ "3,15,0,BCD-ABC-Hallo Welt wie gehts?,2\n"
+ "4,34,0,FGH-CDE-EFG-DEF,1\n"
+ "5,65,0,IJK-HIJ-KLM-JKL-GHI,1\n";
compareResultAsTuples(result, expected);
}
private static class GroupReducer5
implements GroupReduceFunction<
CollectionDataSets.PojoContainingTupleAndWritable, String> {
@Override
public void reduce(Iterable<PojoContainingTupleAndWritable> values, Collector<String> out)
throws Exception {
boolean once = false;
StringBuilder concat = new StringBuilder();
for (PojoContainingTupleAndWritable value : values) {
if (!once) {
concat.append(value.hadoopFan.get());
concat.append("---");
once = true;
}
concat.append(value.theTuple);
concat.append("-");
}
out.collect(concat.toString());
}
}
@Test
public void testGroupingWithPojoContainingMultiplePojos() throws Exception {
/*
* Test grouping with pojo containing multiple pojos (was a bug)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<CollectionDataSets.PojoWithMultiplePojos> ds =
CollectionDataSets.getPojoWithMultiplePojos(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("p2.a2").reduceGroup(new GroupReducer6());
List<String> result = reduceDs.collect();
String expected = "b\nccc\nee\n";
compareResultAsText(result, expected);
}
private static class GroupReducer6
implements GroupReduceFunction<CollectionDataSets.PojoWithMultiplePojos, String> {
@Override
public void reduce(
Iterable<CollectionDataSets.PojoWithMultiplePojos> values, Collector<String> out)
throws Exception {
StringBuilder concat = new StringBuilder();
for (CollectionDataSets.PojoWithMultiplePojos value : values) {
concat.append(value.p2.a2);
}
out.collect(concat.toString());
}
}
@Test
public void testJavaCollectionsWithinPojos() throws Exception {
/*
* Test Java collections within pojos ( == test kryo)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds =
CollectionDataSets.getPojoWithCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("key").reduceGroup(new GroupReducer7());
List<String> result = reduceDs.collect();
String expected =
"callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
compareResultAsText(result, expected);
}
private static class GroupReducer7
implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
public void reduce(
Iterable<CollectionDataSets.PojoWithCollection> values, Collector<String> out) {
StringBuilder concat = new StringBuilder();
concat.append("call");
for (CollectionDataSets.PojoWithCollection value : values) {
concat.append("For key ").append(value.key).append(" we got: ");
for (CollectionDataSets.Pojo1 p : value.pojos) {
concat.append("pojo.a=").append(p.a);
}
}
out.collect(concat.toString());
}
}
@Test
public void testGroupByGenericType() throws Exception {
/*
* Group by generic type
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds =
CollectionDataSets.getPojoWithCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("bigInt").reduceGroup(new GroupReducer8());
List<String> result = reduceDs.collect();
ExecutionConfig ec = env.getConfig();
// check if automatic type registration with Kryo worked
Assert.assertTrue(ec.getRegisteredKryoTypes().contains(BigInt.class));
Assert.assertFalse(ec.getRegisteredKryoTypes().contains(java.sql.Date.class));
String expected = null;
String localExpected =
"[call\n"
+ "For key 92233720368547758070 we got:\n"
+ "PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n"
+ "For key 92233720368547758070 we got:\n"
+ "PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}]";
Assert.assertEquals(localExpected, result.toString());
}
@Test
public void testGroupReduceSelectorKeysWithSemProps() throws Exception {
/*
* Test that semantic properties are correctly adapted when using Selector Keys
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds =
CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple2<Integer, Long>> reduceDs =
ds
// group by selector key
.groupBy(
new KeySelector<
Tuple5<Integer, Long, Integer, String, Long>, Long>() {
@Override
public Long getKey(
Tuple5<Integer, Long, Integer, String, Long> v)
throws Exception {
return (v.f0 * v.f1) - (v.f2 * v.f4);
}
})
.reduceGroup(
new GroupReduceFunction<
Tuple5<Integer, Long, Integer, String, Long>,
Tuple5<Integer, Long, Integer, String, Long>>() {
@Override
public void reduce(
Iterable<Tuple5<Integer, Long, Integer, String, Long>>
values,
Collector<Tuple5<Integer, Long, Integer, String, Long>>
out)
throws Exception {
for (Tuple5<Integer, Long, Integer, String, Long> v :
values) {
out.collect(v);
}
}
})
// add forward field information
.withForwardedFields("0")
// group again and reduce
.groupBy(0)
.reduceGroup(
new GroupReduceFunction<
Tuple5<Integer, Long, Integer, String, Long>,
Tuple2<Integer, Long>>() {
@Override
public void reduce(
Iterable<Tuple5<Integer, Long, Integer, String, Long>>
values,
Collector<Tuple2<Integer, Long>> out)
throws Exception {
int k = 0;
long s = 0;
for (Tuple5<Integer, Long, Integer, String, Long> v :
values) {
k = v.f0;
s += v.f1;
}
out.collect(new Tuple2<>(k, s));
}
});
List<Tuple2<Integer, Long>> result = reduceDs.collect();
String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n";
compareResultAsTuples(result, expected);
}
@Test
public void testGroupReduceWithAtomicValue() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> ds = env.fromElements(1, 1, 2, 3, 4);
DataSet<Integer> reduceDs =
ds.groupBy("*")
.reduceGroup(
new GroupReduceFunction<Integer, Integer>() {
@Override
public void reduce(
Iterable<Integer> values, Collector<Integer> out)
throws Exception {
out.collect(values.iterator().next());
}
});
List<Integer> result = reduceDs.collect();
String expected = "1\n" + "2\n" + "3\n" + "4";
compareResultAsText(result, expected);
}
/**
* Fix for FLINK-2019.
*
* @throws Exception
*/
@Test
public void testJodatimeDateTimeWithKryo() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, DateTime>> ds = env.fromElements(new Tuple2<>(1, DateTime.now()));
DataSet<Tuple2<Integer, DateTime>> reduceDs = ds.groupBy("f1").sum(0).project(0);
List<Tuple2<Integer, DateTime>> result = reduceDs.collect();
String expected = "1\n";
compareResultAsTuples(result, expected);
}
/**
* Fix for FLINK-2158.
*
* @throws Exception
*/
@Test
public void testDateNullException() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Date>> in =
env.fromElements(
new Tuple2<>(0, new Date(1230000000)),
new Tuple2<Integer, Date>(1, null),
new Tuple2<>(2, new Date(1230000000)));
DataSet<String> r =
in.groupBy(0)
.reduceGroup(
new GroupReduceFunction<Tuple2<Integer, Date>, String>() {
@Override
public void reduce(
Iterable<Tuple2<Integer, Date>> values,
Collector<String> out)
throws Exception {
for (Tuple2<Integer, Date> e : values) {
out.collect(Integer.toString(e.f0));
}
}
});
List<String> result = r.collect();
String expected = "0\n1\n2\n";
compareResultAsText(result, expected);
}
private static class GroupReducer8
implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
public void reduce(
Iterable<CollectionDataSets.PojoWithCollection> values, Collector<String> out) {
StringBuilder concat = new StringBuilder();
concat.append("call");
for (CollectionDataSets.PojoWithCollection value : values) {
concat.append("\nFor key ").append(value.bigInt).append(" we got:\n").append(value);
}
out.collect(concat.toString());
}
}
private static class NestedTupleReducer
implements GroupReduceFunction<Tuple2<Tuple2<Integer, Integer>, String>, String> {
@Override
public void reduce(
Iterable<Tuple2<Tuple2<Integer, Integer>, String>> values, Collector<String> out) {
boolean once = false;
StringBuilder concat = new StringBuilder();
for (Tuple2<Tuple2<Integer, Integer>, String> value : values) {
if (!once) {
concat.append(value.f1).append("--");
once = true;
}
concat.append(value.f0); // the tuple with the sorted groups
concat.append("-");
}
out.collect(concat.toString());
}
}
private static class Tuple3GroupReduce
implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple2<Integer, Long>> out) {
int i = 0;
long l = 0L;
for (Tuple3<Integer, Long, String> t : values) {
i += t.f0;
l = t.f1;
}
out.collect(new Tuple2<>(i, l));
}
}
private static class Tuple3SortedGroupReduce
implements GroupReduceFunction<
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple3<Integer, Long, String>> out) {
int sum = 0;
long key = 0;
StringBuilder concat = new StringBuilder();
for (Tuple3<Integer, Long, String> next : values) {
sum += next.f0;
key = next.f1;
concat.append(next.f2).append("-");
}
if (concat.length() > 0) {
concat.setLength(concat.length() - 1);
}
out.collect(new Tuple3<>(sum, key, concat.toString()));
}
}
private static class Tuple5GroupReduce
implements GroupReduceFunction<
Tuple5<Integer, Long, Integer, String, Long>,
Tuple5<Integer, Long, Integer, String, Long>> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(
Iterable<Tuple5<Integer, Long, Integer, String, Long>> values,
Collector<Tuple5<Integer, Long, Integer, String, Long>> out) {
int i = 0;
long l = 0L;
long l2 = 0L;
for (Tuple5<Integer, Long, Integer, String, Long> t : values) {
i = t.f0;
l += t.f1;
l2 = t.f4;
}
out.collect(new Tuple5<>(i, l, 0, "P-)", l2));
}
}
private static class Tuple5SortedGroupReduce
implements GroupReduceFunction<
Tuple5<Integer, Long, Integer, String, Long>,
Tuple5<Integer, Long, Integer, String, Long>> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(
Iterable<Tuple5<Integer, Long, Integer, String, Long>> values,
Collector<Tuple5<Integer, Long, Integer, String, Long>> out) {
int i = 0;
long l = 0L;
long l2 = 0L;
StringBuilder concat = new StringBuilder();
for (Tuple5<Integer, Long, Integer, String, Long> t : values) {
i = t.f0;
l += t.f1;
concat.append(t.f3).append("-");
l2 = t.f4;
}
if (concat.length() > 0) {
concat.setLength(concat.length() - 1);
}
out.collect(new Tuple5<>(i, l, 0, concat.toString(), l2));
}
}
private static class CustomTypeGroupReduce
implements GroupReduceFunction<CustomType, CustomType> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
final Iterator<CustomType> iter = values.iterator();
CustomType o = new CustomType();
CustomType c = iter.next();
o.myString = "Hello!";
o.myInt = c.myInt;
o.myLong = c.myLong;
while (iter.hasNext()) {
CustomType next = iter.next();
o.myLong += next.myLong;
}
out.collect(o);
}
}
private static class CustomTypeSortedGroupReduce
implements GroupReduceFunction<CustomType, CustomType> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
final Iterator<CustomType> iter = values.iterator();
CustomType o = new CustomType();
CustomType c = iter.next();
StringBuilder concat = new StringBuilder(c.myString);
o.myInt = c.myInt;
o.myLong = c.myLong;
while (iter.hasNext()) {
CustomType next = iter.next();
concat.append("-").append(next.myString);
o.myLong += next.myLong;
}
o.myString = concat.toString();
out.collect(o);
}
}
private static class InputReturningTuple3GroupReduce
implements GroupReduceFunction<
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple3<Integer, Long, String>> out) {
for (Tuple3<Integer, Long, String> t : values) {
if (t.f0 < 4) {
t.f2 = "Hi!";
t.f0 += 10;
out.collect(t);
t.f0 += 10;
t.f2 = "Hi again!";
out.collect(t);
}
}
}
}
private static class AllAddingTuple3GroupReduce
implements GroupReduceFunction<
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple3<Integer, Long, String>> out) {
int i = 0;
long l = 0L;
for (Tuple3<Integer, Long, String> t : values) {
i += t.f0;
l += t.f1;
}
out.collect(new Tuple3<>(i, l, "Hello World"));
}
}
private static class AllAddingCustomTypeGroupReduce
implements GroupReduceFunction<CustomType, CustomType> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
CustomType o = new CustomType(0, 0, "Hello!");
for (CustomType next : values) {
o.myInt += next.myInt;
o.myLong += next.myLong;
}
out.collect(o);
}
}
private static class BCTuple3GroupReduce
extends RichGroupReduceFunction<
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
private String f2Replace = "";
@Override
public void open(Configuration config) {
Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
int sum = 0;
for (Integer i : ints) {
sum += i;
}
f2Replace = sum + "";
}
@Override
public void reduce(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple3<Integer, Long, String>> out) {
int i = 0;
long l = 0L;
for (Tuple3<Integer, Long, String> t : values) {
i += t.f0;
l = t.f1;
}
out.collect(new Tuple3<>(i, l, this.f2Replace));
}
}
private static class Tuple3GroupReduceWithCombine
implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>>,
GroupCombineFunction<
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
public void combine(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple3<Integer, Long, String>> out) {
Tuple3<Integer, Long, String> o = new Tuple3<>(0, 0L, "");
for (Tuple3<Integer, Long, String> t : values) {
o.f0 += t.f0;
o.f1 = t.f1;
o.f2 = "test" + o.f1;
}
out.collect(o);
}
@Override
public void reduce(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple2<Integer, String>> out) {
int i = 0;
String s = "";
for (Tuple3<Integer, Long, String> t : values) {
i += t.f0;
s = t.f2;
}
out.collect(new Tuple2<>(i, s));
}
}
private static class Tuple3SortedGroupReduceWithCombine
implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>>,
GroupCombineFunction<
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
public void combine(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple3<Integer, Long, String>> out) {
int sum = 0;
long key = 0;
StringBuilder concat = new StringBuilder();
for (Tuple3<Integer, Long, String> next : values) {
sum += next.f0;
key = next.f1;
concat.append(next.f2).append("-");
}
if (concat.length() > 0) {
concat.setLength(concat.length() - 1);
}
out.collect(new Tuple3<>(sum, key, concat.toString()));
}
@Override
public void reduce(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple2<Integer, String>> out) {
int i = 0;
String s = "";
for (Tuple3<Integer, Long, String> t : values) {
i += t.f0;
s = t.f2;
}
out.collect(new Tuple2<>(i, s));
}
}
private static class Tuple3AllGroupReduceWithCombine
implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>>,
GroupCombineFunction<
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
public void combine(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple3<Integer, Long, String>> out) {
Tuple3<Integer, Long, String> o = new Tuple3<>(0, 0L, "");
for (Tuple3<Integer, Long, String> t : values) {
o.f0 += t.f0;
o.f1 += t.f1;
o.f2 += "test";
}
out.collect(o);
}
@Override
public void reduce(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple2<Integer, String>> out) {
int i = 0;
String s = "";
for (Tuple3<Integer, Long, String> t : values) {
i += t.f0 + t.f1;
s += t.f2;
}
out.collect(new Tuple2<>(i, s));
}
}
private static class CustomTypeGroupReduceWithCombine
implements GroupReduceFunction<CustomType, CustomType>,
GroupCombineFunction<CustomType, CustomType> {
private static final long serialVersionUID = 1L;
@Override
public void combine(Iterable<CustomType> values, Collector<CustomType> out)
throws Exception {
CustomType o = new CustomType();
for (CustomType c : values) {
o.myInt = c.myInt;
o.myLong += c.myLong;
o.myString = "test" + c.myInt;
}
out.collect(o);
}
@Override
public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
CustomType o = new CustomType(0, 0, "");
for (CustomType c : values) {
o.myInt = c.myInt;
o.myLong += c.myLong;
o.myString = c.myString;
}
out.collect(o);
}
}
private static class OrderCheckingCombinableReduce
implements GroupReduceFunction<
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>,
GroupCombineFunction<
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple3<Integer, Long, String>> out)
throws Exception {
Iterator<Tuple3<Integer, Long, String>> it = values.iterator();
Tuple3<Integer, Long, String> t = it.next();
int i = t.f0;
out.collect(t);
while (it.hasNext()) {
t = it.next();
if (i > t.f0 || t.f2.equals("INVALID-ORDER!")) {
t.f2 = "INVALID-ORDER!";
out.collect(t);
}
}
}
@Override
public void combine(
Iterable<Tuple3<Integer, Long, String>> values,
Collector<Tuple3<Integer, Long, String>> out) {
Iterator<Tuple3<Integer, Long, String>> it = values.iterator();
Tuple3<Integer, Long, String> t = it.next();
int i = t.f0;
out.collect(t);
while (it.hasNext()) {
t = it.next();
if (i > t.f0) {
t.f2 = "INVALID-ORDER!";
out.collect(t);
}
}
}
}
private static final class IdentityMapper<T> extends RichMapFunction<T, T> {
@Override
public T map(T value) {
return value;
}
}
private static int countElements(Iterable<?> iterable) {
int c = 0;
for (@SuppressWarnings("unused") Object o : iterable) {
c++;
}
return c;
}
}