title: “Operators” nav-id: streaming_operators nav-show_overview: true nav-parent_id: streaming nav-pos: 9

Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.

This section gives a description of the basic transformations, the effective physical partitioning after applying those as well as insights into Flink's operator chaining.

  • toc {:toc}

DataStream Transformations

    <tr>
      <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
      <td>
        <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
{% highlight java %}

dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } }); {% endhighlight %} Filter
DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: {% highlight java %} dataStream.filter(new FilterFunction() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); {% endhighlight %} KeyBy
DataStream → KeyedStream Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys. This transformation returns a KeyedStream, which is, among other things, required to use keyed state. {% highlight java %} dataStream.keyBy(“someKey”) // Key by field “someKey” dataStream.keyBy(0) // Key by the first element of a Tuple {% endhighlight %} Attention A type cannot be a key if: it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation. it is an array of any type. Reduce
KeyedStream → DataStream A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums: {% highlight java %} keyedStream.reduce(new ReduceFunction() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); {% endhighlight %} Fold
KeyedStream → DataStream A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence “start-1”, “start-1-2”, “start-1-2-3”, ... {% highlight java %} DataStream result = keyedStream.fold(“start”, new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + “-” + value; } }); {% endhighlight %} Aggregations
KeyedStream → DataStream Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight java %} keyedStream.sum(0); keyedStream.sum(“key”); keyedStream.min(0); keyedStream.min(“key”); keyedStream.max(0); keyedStream.max(“key”); keyedStream.minBy(0); keyedStream.minBy(“key”); keyedStream.maxBy(0); keyedStream.maxBy(“key”); {% endhighlight %} Window
KeyedStream → WindowedStream Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. {% highlight java %} dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data {% endhighlight %} WindowAll
DataStream → AllWindowedStream Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator. {% highlight java %} dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data {% endhighlight %} Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead. {% highlight java %} windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });

// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); {% endhighlight %} Window Reduce
WindowedStream → DataStream Applies a functional reduce function to the window and returns the reduced value. {% highlight java %} windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } }); {% endhighlight %} Window Fold
WindowedStream → DataStream Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string “start-1-2-3-4-5”: {% highlight java %} windowedStream.fold(“start”, new FoldFunction<Integer, String>() { public String fold(String current, Integer value) { return current + “-” + value; } }); {% endhighlight %} Aggregations on windows
WindowedStream → DataStream Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight java %} windowedStream.sum(0); windowedStream.sum(“key”); windowedStream.min(0); windowedStream.min(“key”); windowedStream.max(0); windowedStream.max(“key”); windowedStream.minBy(0); windowedStream.minBy(“key”); windowedStream.maxBy(0); windowedStream.maxBy(“key”); {% endhighlight %} Union
DataStream* → DataStream Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream. {% highlight java %} dataStream.union(otherStream1, otherStream2, ...); {% endhighlight %} Window Join
DataStream,DataStream → DataStream Join two data streams on a given key and a common window. {% highlight java %} dataStream.join(otherStream) .where().equalTo() .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...}); {% endhighlight %} Window CoGroup
DataStream,DataStream → DataStream Cogroups two data streams on a given key and a common window. {% highlight java %} dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...}); {% endhighlight %} Connect
DataStream,DataStream → ConnectedStreams “Connects” two data streams retaining their types. Connect allowing for shared state between the two streams. {% highlight java %} DataStream someStream = //... DataStream otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); {% endhighlight %} CoMap, CoFlatMap
ConnectedStreams → DataStream Similar to map and flatMap on a connected data stream {% highlight java %} connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; }

@Override
public Boolean map2(String value) {
    return false;
}

}); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

@Override public void flatMap1(Integer value, Collector out) { out.collect(value.toString()); }

@Override public void flatMap2(String value, Collector out) { for (String word: value.split(" ")) { out.collect(word); } } }); {% endhighlight %} Split
DataStream → SplitStream Split the stream into two or more streams according to some criterion. {% highlight java %} SplitStream split = someDataStream.split(new OutputSelector() { @Override public Iterable select(Integer value) { List output = new ArrayList(); if (value % 2 == 0) { output.add(“even”); } else { output.add(“odd”); } return output; } }); {% endhighlight %} Select
SplitStream → DataStream Select one or more streams from a split stream. {% highlight java %} SplitStream split; DataStream even = split.select(“even”); DataStream odd = split.select(“odd”); DataStream all = split.select(“even”,“odd”); {% endhighlight %} Iterate
DataStream → IterativeStream → DataStream Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. {% highlight java %} IterativeStream iteration = initialStream.iterate(); DataStream iterationBody = iteration.map (/do something/); DataStream feedback = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream output = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } }); {% endhighlight %} Extract Timestamps
DataStream → DataStream Extracts timestamps from records in order to work with windows that use event time semantics. See Event Time. {% highlight java %} stream.assignTimestamps (new TimeStampExtractor() {...}); {% endhighlight %}

    <tr>
      <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
      <td>
        <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
{% highlight scala %}

dataStream.flatMap { str => str.split(" ") } {% endhighlight %} Filter
DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: {% highlight scala %} dataStream.filter { _ != 0 } {% endhighlight %} KeyBy
DataStream → KeyedStream Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedStream. {% highlight scala %} dataStream.keyBy(“someKey”) // Key by field “someKey” dataStream.keyBy(0) // Key by the first element of a Tuple {% endhighlight %} Reduce
KeyedStream → DataStream A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums: {% highlight scala %} keyedStream.reduce { _ + _ } {% endhighlight %} Fold
KeyedStream → DataStream A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence “start-1”, “start-1-2”, “start-1-2-3”, ... {% highlight scala %} val result: DataStream[String] = keyedStream.fold(“start”)((str, i) => { str + “-” + i }) {% endhighlight %} Aggregations
KeyedStream → DataStream Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight scala %} keyedStream.sum(0) keyedStream.sum(“key”) keyedStream.min(0) keyedStream.min(“key”) keyedStream.max(0) keyedStream.max(“key”) keyedStream.minBy(0) keyedStream.minBy(“key”) keyedStream.maxBy(0) keyedStream.maxBy(“key”) {% endhighlight %} Window
KeyedStream → WindowedStream Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows. {% highlight scala %} dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data {% endhighlight %} WindowAll
DataStream → AllWindowedStream Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator. {% highlight scala %} dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data {% endhighlight %} Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead. {% highlight scala %} windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction }

{% endhighlight %}
      </td>
    </tr>
    <tr>
      <td><strong>Window Reduce</strong><br>WindowedStream &rarr; DataStream</td>
      <td>
        <p>Applies a functional reduce function to the window and returns the reduced value.</p>
{% highlight scala %}

windowedStream.reduce { _ + _ } {% endhighlight %} Window Fold
WindowedStream → DataStream Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string “start-1-2-3-4-5”: {% highlight scala %} val result: DataStream[String] = windowedStream.fold(“start”, (str, i) => { str + “-” + i }) {% endhighlight %} Aggregations on windows
WindowedStream → DataStream Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight scala %} windowedStream.sum(0) windowedStream.sum(“key”) windowedStream.min(0) windowedStream.min(“key”) windowedStream.max(0) windowedStream.max(“key”) windowedStream.minBy(0) windowedStream.minBy(“key”) windowedStream.maxBy(0) windowedStream.maxBy(“key”) {% endhighlight %} Union
DataStream* → DataStream Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream. {% highlight scala %} dataStream.union(otherStream1, otherStream2, ...) {% endhighlight %} Window Join
DataStream,DataStream → DataStream Join two data streams on a given key and a common window. {% highlight scala %} dataStream.join(otherStream) .where().equalTo() .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... } {% endhighlight %} Window CoGroup
DataStream,DataStream → DataStream Cogroups two data streams on a given key and a common window. {% highlight scala %} dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {} {% endhighlight %} Connect
DataStream,DataStream → ConnectedStreams “Connects” two data streams retaining their types, allowing for shared state between the two streams. {% highlight scala %} someStream : DataStream[Int] = ... otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream) {% endhighlight %} CoMap, CoFlatMap
ConnectedStreams → DataStream Similar to map and flatMap on a connected data stream {% highlight scala %} connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false ) {% endhighlight %} Split
DataStream → SplitStream Split the stream into two or more streams according to some criterion. {% highlight scala %} val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List(“even”) case 1 => List(“odd”) } ) {% endhighlight %} Select
SplitStream → DataStream Select one or more streams from a split stream. {% highlight scala %}

val even = split select “even” val odd = split select “odd” val all = split.select(“even”,“odd”) {% endhighlight %} Iterate
DataStream → IterativeStream → DataStream Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. {% highlight java %} initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } } {% endhighlight %} Extract Timestamps
DataStream → DataStream Extracts timestamps from records in order to work with windows that use event time semantics. See Event Time. {% highlight scala %} stream.assignTimestamps { timestampExtractor } {% endhighlight %}

Extraction from tuples, case classes and collections via anonymous pattern matching, like the following: {% highlight scala %} val data: DataStream[(Int, String, Double)] = // [...] data.map { case (id, name, temperature) => // [...] } {% endhighlight %} is not supported by the API out-of-the-box. To use this feature, you should use a Scala API extension.

The following transformations are available on data streams of Tuples:

Physical partitioning

Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, via the following functions.

    <div style="text-align: center">
        <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
        </div>


    <p>
                {% highlight java %}

dataStream.rescale(); {% endhighlight %}

    </p>
  </td>
</tr>
    </p>
    </p>
        Please see this figure for a visualization of the connection pattern in the above
        example:
    </p>

    <div style="text-align: center">
        <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
        </div>


    <p>
                {% highlight java %}

dataStream.rescale() {% endhighlight %}

    </p>
  </td>
</tr>

Task chaining and resource groups

Chaining two subsequent transformations means co-locating them within the same thread for better performance. Flink by default chains operators if this is possible (e.g., two subsequent map transformations). The API gives fine-grained control over chaining if desired:

Use StreamExecutionEnvironment.disableOperatorChaining() if you want to disable chaining in the whole job. For more fine grained control, the following functions are available. Note that these functions can only be used right after a DataStream transformation as they refer to the previous transformation. For example, you can use someStream.map(...).startNewChain(), but you cannot use someStream.startNewChain().

A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.

{% top %}