blob: 6761c5226f8d6c58f38762c7ca9e5137e1e2d0d4 [file] [log] [blame] [view]
---
title: "Java API Transformations"
---
<section id="top">
DataSet Transformations
-----------------------
This document gives a deep-dive into the available transformations on DataSets. For a general introduction to the
Flink Java API, please refer to the [API guide](java_api_guide.html)
### Map
The Map transformation applies a user-defined `MapFunction` on each element of a DataSet.
It implements a one-to-one mapping, that is, exactly one element must be returned by
the function.
The following code transforms a `DataSet` of Integer pairs into a `DataSet` of Integers:
```java
// MapFunction that adds two integer values
public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
@Override
public Integer map(Tuple2<Integer, Integer> in) {
return in.f0 + in.f1;
}
}
// [...]
DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
DataSet<Integer> intSums = intPairs.map(new IntAdder());
```
### FlatMap
The FlatMap transformation applies a user-defined `FlatMapFunction` on each element of a `DataSet`.
This variant of a map function can return arbitrary many result elements (including none) for each input element.
The following code transforms a `DataSet` of text lines into a `DataSet` of words:
```java
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) {
for (String token : value.split("\\W")) {
out.collect(token);
}
}
}
// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());
```
### Filter
The Filter transformation applies a user-defined `FilterFunction` on each element of a `DataSet` and retains only those elements for which the function returns `true`.
The following code removes all Integers smaller than zero from a `DataSet`:
```java
// FilterFunction that filters out all Integers smaller than zero.
public class NaturalNumberFilter implements FilterFunction<Integer> {
@Override
public boolean filter(Integer number) {
return number >= 0;
}
}
// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
```
### Project (Tuple DataSets only)
The Project transformation removes or moves `Tuple` fields of a `Tuple` `DataSet`.
The `project(int...)` method selects `Tuple` fields that should be retained by their index and defines their order in the output `Tuple`.
The `types(Class<?> ...)`method must give the types of the output `Tuple` fields.
Projections do not require the definition of a user function.
The following code shows different ways to apply a Project transformation on a `DataSet`:
```java
DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integer.class);
```
### Transformations on grouped DataSet
The reduce operations can operate on grouped data sets. Specifying the key to
be used for grouping can be done in two ways:
- a `KeySelector` function or
- one or more field position keys (`Tuple` `DataSet` only).
Please look at the reduce examples to see how the grouping keys are specified.
### Reduce on grouped DataSet
A Reduce transformation that is applied on a grouped `DataSet` reduces each group to a single element using a user-defined `ReduceFunction`.
For each group of input elements, a `ReduceFunction` successively combines pairs of elements into one element until only a single element for each group remains.
#### Reduce on DataSet grouped by KeySelector Function
A `KeySelector` function extracts a key value from each element of a `DataSet`. The extracted key value is used to group the `DataSet`.
The following code shows how to group a POJO `DataSet` using a `KeySelector` function and to reduce it with a `ReduceFunction`.
```java
// some ordinary POJO
public class WC {
public String word;
public int count;
// [...]
}
// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
@Override
public WC reduce(WC in1, WC in2) {
return new WC(in1.word, in1.count + in2.count);
}
}
// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
// DataSet grouping with inline-defined KeySelector function
.groupBy(
new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
})
// apply ReduceFunction on grouped DataSet
.reduce(new WordCounter());
```
#### Reduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
Field position keys specify one or more fields of a `Tuple` `DataSet` that are used as grouping keys.
The following code shows how to use field position keys and apply a `ReduceFunction`.
```java
DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
DataSet<Tuple3<String, Integer, Double>> reducedTuples =
tuples
// group DataSet on first and second field of Tuple
.groupBy(0,1)
// apply ReduceFunction on grouped DataSet
.reduce(new MyTupleReducer());
```
### GroupReduce on grouped DataSet
A GroupReduce transformation that is applied on a grouped `DataSet` calls a user-defined `GroupReduceFunction` for each group. The difference
between this and `Reduce` is that the user defined function gets the whole group at once.
The function is invoked with an Iterable over all elements of a group and can return an arbitrary number of result elements using the collector.
#### GroupReduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
The following code shows how duplicate strings can be removed from a `DataSet` grouped by Integer.
```java
public class DistinctReduce
implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@Override
public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
Set<String> uniqStrings = new HashSet<String>();
Integer key = null;
// add all strings of the group to the set
for (Tuple2<Integer, String> t : in) {
key = t.f0;
uniqStrings.add(t.f1);
}
// emit all unique strings.
for (String s : uniqStrings) {
out.collect(new Tuple2<Integer, String>(key, s));
}
}
}
// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Tuple2<Integer, String>> output = input
.groupBy(0) // group DataSet by the first tuple field
.reduceGroup(new DistinctReduce()); // apply GroupReduceFunction
```
#### GroupReduce on DataSet grouped by KeySelector Function
Works analogous to `KeySelector` functions in Reduce transformations.
#### GroupReduce on sorted groups (Tuple DataSets only)
A `GroupReduceFunction` accesses the elements of a group using an Iterable. Optionally, the Iterable can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined `GroupReduceFunction` and improve its efficiency.
Right now, this feature is only available for DataSets of Tuples.
The following code shows another example how to remove duplicate Strings in a `DataSet` grouped by an Integer and sorted by String.
```java
// GroupReduceFunction that removes consecutive identical elements
public class DistinctReduce
implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@Override
public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
Integer key = null;
String comp = null;
for (Tuple2<Integer, String> t : in) {
key = t.f0;
String next = t.f1;
// check if strings are different
if (com == null || !next.equals(comp)) {
out.collect(new Tuple2<Integer, String>(key, next));
comp = next;
}
}
}
}
// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Double> output = input
.groupBy(0) // group DataSet by first field
.sortGroup(1, Order.ASCENDING) // sort groups on second tuple field
.reduceGroup(new DistinctReduce());
```
**Note:** A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.
#### Combinable GroupReduceFunctions
In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not
necessarily combinable. In order to make a `GroupReduceFunction`
combinable, you need to use the `RichGroupReduceFunction` variant,
implement (override) the `combine()` method, and annotate the
`GroupReduceFunction` with the `@Combinable` annotation as shown here:
```java
// Combinable GroupReduceFunction that computes two sums.
// Note that we use the RichGroupReduceFunction because it defines the combine method
@Combinable
public class MyCombinableGroupReducer
extends RichGroupReduceFunction<Tuple3<String, Integer, Double>,
Tuple3<String, Integer, Double>> {
@Override
public void reduce(Iterable<Tuple3<String, Integer, Double>> in,
Collector<Tuple3<String, Integer, Double>> out) {
String key = null
int intSum = 0;
double doubleSum = 0.0;
for (Tuple3<String, Integer, Double> curr : in) {
key = curr.f0;
intSum += curr.f1;
doubleSum += curr.f2;
}
// emit a tuple with both sums
out.collect(new Tuple3<String, Integer, Double>(key, intSum, doubleSum));
}
@Override
public void combine(Iterable<Tuple3<String, Integer, Double>> in,
Collector<Tuple3<String, Integer, Double>> out)) {
// in some cases combine() calls can simply be forwarded to reduce().
this.reduce(in, out);
}
}
```
### Aggregate on grouped Tuple DataSet
There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
- Sum,
- Min, and
- Max.
The Aggregate transformation can only be applied on a `Tuple` `DataSet` and supports only field positions keys for grouping.
The following code shows how to apply an Aggregation transformation on a `DataSet` grouped by field position keys:
```java
DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input
.groupBy(1) // group DataSet on second field
.aggregate(SUM, 0) // compute sum of the first field
.and(MIN, 2); // compute minimum of the third field
```
To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet.
In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.
**Note:** The set of aggregation functions will be extended in the future.
### Reduce on full DataSet
The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a `DataSet`.
The `ReduceFunction` subsequently combines pairs of elements into one element until only a single element remains.
The following code shows how to sum all elements of an Integer `DataSet`:
```java
// ReduceFunction that sums Integers
public class IntSummer implements ReduceFunction<Integer> {
@Override
public Integer reduce(Integer num1, Integer num2) {
return num1 + num2;
}
}
// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
```
Reducing a full `DataSet` using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a `ReduceFunction` is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
### GroupReduce on full DataSet
The GroupReduce transformation applies a user-defined `GroupReduceFunction` on all elements of a `DataSet`.
A `GroupReduceFunction` can iterate over all elements of `DataSet` and return an arbitrary number of result elements.
The following example shows how to apply a GroupReduce transformation on a full `DataSet`:
```java
DataSet<Integer> input = // [...]
// apply a (preferably combinable) GroupReduceFunction to a DataSet
DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
```
**Note:** A GroupReduce transformation on a full `DataSet` cannot be done in parallel if the `GroupReduceFunction` is not combinable. Therefore, this can be a very compute intensive operation. See the paragraph on "Combineable `GroupReduceFunction`s" above to learn how to implement a combinable `GroupReduceFunction`.
### Aggregate on full Tuple DataSet
There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
- Sum,
- Min, and
- Max.
The Aggregate transformation can only be applied on a `Tuple` `DataSet`.
The following code shows how to apply an Aggregation transformation on a full `DataSet`:
```java
DataSet<Tuple2<Integer, Double>> input = // [...]
DataSet<Tuple2<Integer, Double>> output = input
.aggregate(SUM, 0) // compute sum of the first field
.and(MIN, 1); // compute minimum of the second field
```
**Note:** Extending the set of supported aggregation functions is on our roadmap.
### Join
The Join transformation joins two `DataSet`s into one `DataSet`. The elements of both `DataSet`s are joined on one or more keys which can be specified using
- a `KeySelector` function or
- one or more field position keys (`Tuple` `DataSet` only).
There are a few different ways to perform a Join transformation which are shown in the following.
#### Default Join (Join into Tuple2)
The default Join transformation produces a new `Tuple``DataSet` with two fields. Each tuple holds a joined element of the first input `DataSet` in the first tuple field and a matching element of the second input `DataSet` in the second field.
The following code shows a default Join transformation using field position keys:
```java
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Double, Integer>> input2 = // [...]
// result dataset is typed as Tuple2
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Double, Integer>>>
result = input1.join(input2)
.where(0) // key of the first input
.equalTo(1); // key of the second input
```
#### Join with JoinFunction
A Join transformation can also call a user-defined `JoinFunction` to process joining tuples.
A `JoinFunction` receives one element of the first input `DataSet` and one element of the second input `DataSet` and returns exactly one element.
The following code performs a join of `DataSet` with custom java objects and a `Tuple` `DataSet` using `KeySelector` functions and shows how to call a user-defined `JoinFunction`:
```java
// some POJO
public class Rating {
public String name;
public String category;
public int points;
}
// Join function that joins a custom POJO with a Tuple
public class PointWeighter
implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
@Override
public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
// multiply the points and rating and construct a new output tuple
return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
}
}
DataSet<Rating> ratings = // [...]
DataSet<Tuple2<String, Double>> weights = // [...]
DataSet<Tuple2<String, Double>>
weightedRatings =
ratings.join(weights)
// key of the first input
.where(new KeySelection<Rating, String>() {
public String getKey(Rating r) { return r.category; }
})
// key of the second input
.equalTo(new KeySelection<Tuple2<String, Double>, String>() {
public String getKey(Tuple2<String, Double> t) { return t.f0; }
})
// applying the JoinFunction on joining pairs
.with(new PointWeighter());
```
#### Join with FlatJoinFunction
Analogous to Map and FlatMap, a FlatJoin function behaves in the same
way as a JoinFunction, but instead of returning one element, it can
return (collect), zero, one, or more elements.
{% highlight java %}
public class PointWeighter
implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
@Override
public void join(Rating rating, Tuple2<String, Double> weight,
Collector<Tuple2<String, Double>> out) {
if (weight.f1 > 0.1) {
out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1));
}
}
}
DataSet<Tuple2<String, Double>>
weightedRatings =
ratings.join(weights) // [...]
{% endhighlight %}
#### Join with Projection
A Join transformation can construct result tuples using a projection as shown here:
```java
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, String, Double, Byte>
result =
input1.join(input2)
// key definition on first DataSet using a field position key
.where(0)
// key definition of second DataSet using a field position key
.equalTo(0)
// select and reorder fields of matching tuples
.projectFirst(0,2).projectSecond(1).projectFirst(1)
.types(Integer.class, String.class, Double.class, Byte.class);
```
`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output `Tuple`. The order of indexes defines the order of fields in the output tuple.
The join projection works also for non-`Tuple` `DataSet`s. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output `Tuple`.
#### Join with DataSet Size Hint
In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to join as shown here:
```java
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result1 =
// hint that the second DataSet is very small
input1.joinWithTiny(input2)
.where(0)
.equalTo(0);
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result2 =
// hint that the second DataSet is very large
input1.joinWithHuge(input2)
.where(0)
.equalTo(0);
```
### Cross
The Cross transformation combines two `DataSet`s into one `DataSet`. It builds all pairwise combinations of the elements of both input `DataSet`s, i.e., it builds a Cartesian product.
The Cross transformation either calls a user-defined `CrossFunction` on each pair of elements or applies a projection. Both modes are shown in the following.
**Note:** Cross is potentially a *very* compute-intensive operation which can challenge even large compute clusters!
#### Cross with User-Defined Function
A Cross transformation can call a user-defined `CrossFunction`. A `CrossFunction` receives one element of the first input and one element of the second input and returns exactly one result element.
The following code shows how to apply a Cross transformation on two `DataSet`s using a `CrossFunction`:
```java
public class Coord {
public int id;
public int x;
public int y;
}
// CrossFunction computes the Euclidean distance between two Coord objects.
public class EuclideanDistComputer
implements CrossFunction<Coord, Coord, Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> cross(Coord c1, Coord c2) {
// compute Euclidean distance of coordinates
double dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2));
return new Tuple3<Integer, Integer, Double>(c1.id, c2.id, dist);
}
}
DataSet<Coord> coords1 = // [...]
DataSet<Coord> coords2 = // [...]
DataSet<Tuple3<Integer, Integer, Double>>
distances =
coords1.cross(coords2)
// apply CrossFunction
.with(new EuclideanDistComputer());
```
#### Cross with Projection
A Cross transformation can also construct result tuples using a projection as shown here:
```java
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, Byte, Integer, Double>
result =
input1.cross(input2)
// select and reorder fields of matching tuples
.projectSecond(0).projectFirst(1,0).projectSecond(1)
.types(Integer.class, Byte.class, Integer.class, Double.class);
```
The field selection in a Cross projection works the same way as in the projection of Join results.
#### Cross with DataSet Size Hint
In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to cross as shown here:
```java
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple4<Integer, String, Integer, String>>
udfResult =
// hint that the second DataSet is very small
input1.crossWithTiny(input2)
// apply any Cross function (or projection)
.with(new MyCrosser());
DataSet<Tuple3<Integer, Integer, String>>
projectResult =
// hint that the second DataSet is very large
input1.crossWithHuge(input2)
// apply a projection (or any Cross function)
.projectFirst(0,1).projectSecond(1).types(Integer.class, String.class, String.class)
```
### CoGroup
The CoGroup transformation jointly processes groups of two `DataSet`s. Both `DataSet`s are grouped on a defined key and groups of both `DataSet`s that share the same key are handed together to a user-defined `CoGroupFunction`. If for a specific key only one `DataSet` has a group, the `CoGroupFunction` is called with this group and an empty group.
A `CoGroupFunction` can separately iterate over the elements of both groups and return an arbitrary number of result elements.
Similar to Reduce, GroupReduce, and Join, keys can be defined using
- a `KeySelector` function or
- one or more field position keys (`Tuple` `DataSet` only).
#### CoGroup on DataSets grouped by Field Position Keys (Tuple DataSets only)
```java
// Some CoGroupFunction definition
class MyCoGrouper
implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> iVals,
Iterable<Tuple2<String, Double>> dVals,
Collector<Double> out) {
Set<Integer> ints = new HashSet<Integer>();
// add all Integer values in group to set
for (Tuple2<String, Integer>> val : iVale) {
ints.add(val.f1);
}
// multiply each Double value with each unique Integer values of group
for (Tuple2<String, Double> val : dVals) {
for (Integer i : ints) {
out.collect(val.f1 * i);
}
}
}
}
// [...]
DataSet<Tuple2<String, Integer>> iVals = // [...]
DataSet<Tuple2<String, Double>> dVals = // [...]
DataSet<Double> output = iVals.coGroup(dVals)
// group first DataSet on first tuple field
.where(0)
// group second DataSet on first tuple field
.equalTo(0)
// apply CoGroup function on each pair of groups
.with(new MyCoGrouper());
```
#### CoGroup on DataSets grouped by Key Selector Function
Works analogous to key selector functions in Join transformations.
### Union
Produces the union of two `DataSet`s, which have to be of the same type. A union of more than two `DataSet`s can be implemented with multiple union calls, as shown here:
```java
DataSet<Tuple2<String, Integer>> vals1 = // [...]
DataSet<Tuple2<String, Integer>> vals2 = // [...]
DataSet<Tuple2<String, Integer>> vals3 = // [...]
DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
.union(vals3);
```
[Back to top](#top)