title: “Flink DataStream API Programming Guide”

Top-level navigation

top-nav-group: apis top-nav-pos: 2 top-nav-title: Streaming Guide (DataStream API)

Sub-level navigation

sub-nav-group: streaming sub-nav-group-title: Streaming Guide sub-nav-pos: 1 sub-nav-title: DataStream API

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for example write the data to files, or to standard output (for example the command line terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines.

Please see [basic concepts]({{ site.baseurl }}/apis/common/index.html) for an introduction to the basic concepts of the Flink API.

In order to create your own Flink DataStream program, we encourage you to start with [anatomy of a Flink Program]({{ site.baseurl }}/apis/common/index.html#anatomy-of-a-flink-program) and gradually add your own transformations. The remaining sections act as references for additional operations and advanced features.

  • This will be replaced by the TOC {:toc}

Example Program

The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally.

{% highlight java %} import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;

public class WindowWordCount {

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);

    dataStream.print();

    env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
        for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

} {% endhighlight %}

import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount { def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)

val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1)

counts.print

env.execute("Window Stream WordCount")

} } {% endhighlight %}

To run the example program, start the input stream with netcat first from a terminal:

nc -lk 9999

Just type some words hitting return for a new word. These will be the input to the word count program. If you want to see counts greater than 1, type the same word again and again within 5 seconds (increase the window size from 5 seconds if you cannot type that fast ☺).

{% top %}

DataStream Transformations

Data transformations transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated topologies.

This section gives a description of all the available transformations.

    <tr>
      <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
      <td>
        <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
{% highlight java %}

dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } }); {% endhighlight %} Filter
DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: {% highlight java %} dataStream.filter(new FilterFunction() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); {% endhighlight %} KeyBy
DataStream → KeyedStream Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream. {% highlight java %} dataStream.keyBy(“someKey”) // Key by field “someKey” dataStream.keyBy(0) // Key by the first element of a Tuple {% endhighlight %} Reduce
KeyedStream → DataStream A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums: {% highlight java %} keyedStream.reduce(new ReduceFunction() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); {% endhighlight %} Fold
KeyedStream → DataStream A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence “start-1”, “start-1-2”, “start-1-2-3”, ... {% highlight java %} DataStream result = keyedStream.fold(“start”, new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + “-” + value; } }); {% endhighlight %} Aggregations
KeyedStream → DataStream Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight java %} keyedStream.sum(0); keyedStream.sum(“key”); keyedStream.min(0); keyedStream.min(“key”); keyedStream.max(0); keyedStream.max(“key”); keyedStream.minBy(0); keyedStream.minBy(“key”); keyedStream.maxBy(0); keyedStream.maxBy(“key”); {% endhighlight %} Window
KeyedStream → WindowedStream Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. {% highlight java %} dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data {% endhighlight %} WindowAll
DataStream → AllWindowedStream Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator. {% highlight java %} dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data {% endhighlight %} Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead. {% highlight java %} windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });

// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); {% endhighlight %} Window Reduce
WindowedStream → DataStream Applies a functional reduce function to the window and returns the reduced value. {% highlight java %} windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } }; {% endhighlight %} Window Fold
WindowedStream → DataStream Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string “start-1-2-3-4-5”: {% highlight java %} windowedStream.fold(“start-”, new FoldFunction<Integer, String>() { public String fold(String current, Integer value) { return current + “-” + value; } }; {% endhighlight %} Aggregations on windows
WindowedStream → DataStream Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight java %} windowedStream.sum(0); windowedStream.sum(“key”); windowedStream.min(0); windowedStream.min(“key”); windowedStream.max(0); windowedStream.max(“key”); windowedStream.minBy(0); windowedStream.minBy(“key”); windowedStream.maxBy(0); windowedStream.maxBy(“key”); {% endhighlight %} Union
DataStream* → DataStream Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will get each element twice in the resulting stream. {% highlight java %} dataStream.union(otherStream1, otherStream2, ...); {% endhighlight %} Window Join
DataStream,DataStream → DataStream Join two data streams on a given key and a common window. {% highlight java %} dataStream.join(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...}); {% endhighlight %} Window CoGroup
DataStream,DataStream → DataStream Cogroups two data streams on a given key and a common window. {% highlight java %} dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...}); {% endhighlight %} Connect
DataStream,DataStream → ConnectedStreams “Connects” two data streams retaining their types. Connect allowing for shared state between the two streams. {% highlight java %} DataStream someStream = //... DataStream otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); {% endhighlight %} CoMap, CoFlatMap
ConnectedStreams → DataStream Similar to map and flatMap on a connected data stream {% highlight java %} connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; }

@Override
public Boolean map2(String value) {
    return false;
}

}); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

@Override public void flatMap1(Integer value, Collector out) { out.collect(value.toString()); }

@Override public void flatMap2(String value, Collector out) { for (String word: value.split(" ")) { out.collect(word); } } }); {% endhighlight %} Split
DataStream → SplitStream Split the stream into two or more streams according to some criterion. {% highlight java %} SplitStream split = someDataStream.split(new OutputSelector() { @Override public Iterable select(Integer value) { List output = new ArrayList(); if (value % 2 == 0) { output.add(“even”); } else { output.add(“odd”); } return output; } }); {% endhighlight %} Select
SplitStream → DataStream Select one or more streams from a split stream. {% highlight java %} SplitStream split; DataStream even = split.select(“even”); DataStream odd = split.select(“odd”); DataStream all = split.select(“even”,“odd”); {% endhighlight %} Iterate
DataStream → IterativeStream → DataStream Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. {% highlight java %} IterativeStream iteration = initialStream.iterate(); DataStream iterationBody = iteration.map (/do something/); DataStream feedback = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream output = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } }); {% endhighlight %} Extract Timestamps
DataStream → DataStream Extracts timestamps from records in order to work with windows that use event time semantics. See working with time. {% highlight java %} stream.assignTimestamps (new TimeStampExtractor() {...}); {% endhighlight %}

    <tr>
      <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
      <td>
        <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
{% highlight scala %}

dataStream.flatMap { str => str.split(" ") } {% endhighlight %} Filter
DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: {% highlight scala %} dataStream.filter { _ != 0 } {% endhighlight %} KeyBy
DataStream → KeyedStream Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream. {% highlight scala %} dataStream.keyBy(“someKey”) // Key by field “someKey” dataStream.keyBy(0) // Key by the first element of a Tuple {% endhighlight %} Reduce
KeyedStream → DataStream A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums: {% highlight scala %} keyedStream.reduce { _ + _ } {% endhighlight %} Fold
KeyedStream → DataStream A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence “start-1”, “start-1-2”, “start-1-2-3”, ... {% highlight scala %} val result: DataStream[String] = keyedStream.fold(“start”, (str, i) => { str + “-” + i }) {% endhighlight %} Aggregations
KeyedStream → DataStream Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight scala %} keyedStream.sum(0) keyedStream.sum(“key”) keyedStream.min(0) keyedStream.min(“key”) keyedStream.max(0) keyedStream.max(“key”) keyedStream.minBy(0) keyedStream.minBy(“key”) keyedStream.maxBy(0) keyedStream.maxBy(“key”) {% endhighlight %} Window
KeyedStream → WindowedStream Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows. {% highlight scala %} dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data {% endhighlight %} WindowAll
DataStream → AllWindowedStream Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator. {% highlight scala %} dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data {% endhighlight %} Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead. {% highlight scala %} windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction }

{% endhighlight %}
      </td>
    </tr>
    <tr>
      <td><strong>Window Reduce</strong><br>WindowedStream &rarr; DataStream</td>
      <td>
        <p>Applies a functional reduce function to the window and returns the reduced value.</p>
{% highlight scala %}

windowedStream.reduce { _ + _ } {% endhighlight %} Window Fold
WindowedStream → DataStream Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string “start-1-2-3-4-5”: {% highlight scala %} val result: DataStream[String] = windowedStream.fold(“start”, (str, i) => { str + “-” + i }) {% endhighlight %} Aggregations on windows
WindowedStream → DataStream Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight scala %} windowedStream.sum(0) windowedStream.sum(“key”) windowedStream.min(0) windowedStream.min(“key”) windowedStream.max(0) windowedStream.max(“key”) windowedStream.minBy(0) windowedStream.minBy(“key”) windowedStream.maxBy(0) windowedStream.maxBy(“key”) {% endhighlight %} Union
DataStream* → DataStream Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will get each element twice in the resulting stream. {% highlight scala %} dataStream.union(otherStream1, otherStream2, ...) {% endhighlight %} Window Join
DataStream,DataStream → DataStream Join two data streams on a given key and a common window. {% highlight scala %} dataStream.join(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... } {% endhighlight %} Window CoGroup
DataStream,DataStream → DataStream Cogroups two data streams on a given key and a common window. {% highlight scala %} dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {} {% endhighlight %} Connect
DataStream,DataStream → ConnectedStreams “Connects” two data streams retaining their types, allowing for shared state between the two streams. {% highlight scala %} someStream : DataStream[Int] = ... otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream) {% endhighlight %} CoMap, CoFlatMap
ConnectedStreams → DataStream Similar to map and flatMap on a connected data stream {% highlight scala %} connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false ) {% endhighlight %} Split
DataStream → SplitStream Split the stream into two or more streams according to some criterion. {% highlight scala %} val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List(“even”) case 1 => List(“odd”) } ) {% endhighlight %} Select
SplitStream → DataStream Select one or more streams from a split stream. {% highlight scala %}

val even = split select “even” val odd = split select “odd” val all = split.select(“even”,“odd”) {% endhighlight %} Iterate
DataStream → IterativeStream → DataStream Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. {% highlight java %} initialStream. iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } } IterativeStream iteration = initialStream.iterate(); DataStream iterationBody = iteration.map (/do something/); DataStream feedback = iterationBody.filter ( _ > 0); iteration.closeWith(feedback); {% endhighlight %} Extract Timestamps
DataStream → DataStream Extracts timestamps from records in order to work with windows that use event time semantics. See working with time. {% highlight scala %} stream.assignTimestamps { timestampExtractor } {% endhighlight %}

The following transformations are available on data streams of Tuples:

Physical partitioning

Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, via the following functions.

    </p>
    </p>
        Please see this figure for a visualization of the connection pattern in the above
        example:
    </p>

    <div style="text-align: center">
        <img src="{{ site.baseurl }}/apis/streaming/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
        </div>


    <p>
                {% highlight java %}

dataStream.rescale(); {% endhighlight %}

    </p>
  </td>
</tr>
    </p>
    </p>
        Please see this figure for a visualization of the connection pattern in the above
        example:
    </p>

    <div style="text-align: center">
        <img src="{{ site.baseurl }}/apis/streaming/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
        </div>


    <p>
                {% highlight java %}

dataStream.rescale() {% endhighlight %}

    </p>
  </td>
</tr>

Task chaining and resource groups

Chaining two subsequent transformations means co-locating them within the same thread for better performance. Flink by default chains operators if this is possible (e.g., two subsequent map transformations). The API gives fine-grained control over chaining if desired:

Use StreamExecutionEnvironment.disableOperatorChaining() if you want to disable chaining in the whole job. For more fine grained control, the following functions are available. Note that these functions can only be used right after a DataStream transformation as they refer to the previous transformation. For example, you can use someStream.map(...).startNewChain(), but you cannot use someStream.startNewChain().

A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.

{% top %}

Data Sources

Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction). You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileStream - create a stream by appending elements when there are changes to a file

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

Collection-based:

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(T ...) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer08<>(...)). See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details.

Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction). You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileStream - create a stream by appending elements when there are changes to a file

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

Collection-based:

  • fromCollection(Seq) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(elements: _*) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer08<>(...)). See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details.

{% top %}

Data Sinks

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • writeUsingOutputFormat() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • writeUsingOutputFormat() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Note that the write*() methods on DataStream are mainly intended for debugging purposes. They are not participating in Flink's checkpointing, this means these functions usually have at-least-once semantics. The data flushing to the target system depends on the implementation of the OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up in the target system. Also, in failure cases, those records might be lost.

For reliable, exactly-once delivery of a stream into a file system, use the flink-connector-filesystem. Also, custom implementations through the .addSink(...) method can partiticpate in Flink's checkpointing for exactly-once semantics.

{% top %}

Iterations

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example using filters. First, we define an IterativeStream

{% highlight java %} IterativeStream iteration = input.iterate(); {% endhighlight %}

Then, we specify the logic that will be executed inside the loop using a series of trasformations (here a simple map transformation)

{% highlight java %} DataStream iterationBody = iteration.map(/* this is executed many times */); {% endhighlight %}

To close an iteration and define the iteration tail, call the closeWith(feedbackStream) method of the IterativeStream. The DataStream given to the closeWith function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the strem that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the “termination” logic, where an element is allowed to propagate downstream rather than being fed back.

{% highlight java %} iteration.closeWith(iterationBody.filter(/* one part of the stream /)); DataStream output = iterationBody.filter(/ some other part of the stream */); {% endhighlight %}

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

{% highlight java %} DataStream someIntegers = env.generateSequence(0, 1000);

IterativeStream iteration = someIntegers.iterate();

DataStream minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return value - 1 ; } });

DataStream stillGreaterThanZero = minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value > 0); } });

iteration.closeWith(stillGreaterThanZero);

DataStream lessThanZero = minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } }); {% endhighlight %}

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example iteration where the body (the part of the computation that is repeated) is a simple map transformation, and the elements that are fed back are distinguished by the elements that are forwarded downstream using filters.

{% highlight scala %} val iteratedStream = someDataStream.iterate( iteration => { val iterationBody = iteration.map(/* this is executed many times /) (tail.filter(/ one part of the stream /), tail.filter(/ some other part of the stream */)) }) {% endhighlight %}

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

{% highlight scala %} val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate( iteration => { val minusOne = iteration.map( v => v - 1) val stillGreaterThanZero = minusOne.filter (_ > 0) val lessThanZero = minusOne.filter(_ <= 0) (stillGreaterThanZero, lessThanZero) } ) {% endhighlight %}

{% top %}

Execution Parameters

The StreamExecutionEnvironment contains the ExecutionConfig which allows to set job specific configuration values for the runtime.

Please refer to [execution configuration]({{ site.baseurl }}/apis/common/index.html#execution-configuration) for an explanation of most parameters. These parameters pertain specifically to the DataStream API:

  • enableTimestamps() / disableTimestamps(): Attach a timestamp to each event emitted from a source. areTimestampsEnabled() returns the current value.

  • setAutoWatermarkInterval(long milliseconds): Set the interval for automatic watermark emission. You can get the current value with long getAutoWatermarkInterval()

{% top %}

Fault Tolerance

The Fault Tolerance Documentation describes the options and parameters to enable and configure Flink's checkpointing mechanism.

Controlling Latency

By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. To control throughput and latency, you can use env.setBufferTimeout(timeoutMillis) on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.

Usage:

env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); {% endhighlight %}

env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis) {% endhighlight %}

To maximize throughput, set setBufferTimeout(-1) which will remove the timeout and buffers will only be flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.

{% top %}

Debugging

Before running a streaming program in a distributed cluster, it is a good idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis programs is usually an incremental process of checking results, debugging, and improving.

Flink provides features to significantly ease the development process of data analysis programs by supporting local debugging from within an IDE, injection of test data, and collection of result data. This section give some hints how to ease the development of Flink programs.

Local Execution Environment

A LocalStreamEnvironment starts a Flink system within the same JVM process it was created in. If you start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your program.

A LocalEnvironment is created and used as follows:

DataStream lines = env.addSource(/* some source */); // build your program

env.execute(); {% endhighlight %}

{% highlight scala %} val env = StreamExecutionEnvironment.createLocalEnvironment()

val lines = env.addSource(/* some source */) // build your program

env.execute() {% endhighlight %}

Collection Data Sources

Flink provides special data sources which are backed by Java collections to ease testing. Once a program has been tested, the sources and sinks can be easily replaced by sources and sinks that read from / write to external systems.

Collection data sources can be used as follows:

// Create a DataStream from a list of elements DataStream myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection List<Tuple2<String, Integer>> data = ... DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator Iterator longIt = ... DataStream myLongs = env.fromCollection(longIt, Long.class); {% endhighlight %}

// Create a DataStream from a list of elements val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataStream from any Collection val data: Seq[(String, Int)] = ... val myTuples = env.fromCollection(data)

// Create a DataStream from an Iterator val longIt: Iterator[Long] = ... val myLongs = env.fromCollection(longIt) {% endhighlight %}

Note: Currently, the collection data source requires that data types and iterators implement Serializable. Furthermore, collection data sources can not be executed in parallel ( parallelism = 1).

Iterator Data Sink

Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows:

DataStream<Tuple2<String, Integer>> myResult = ... Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult) {% endhighlight %}

{% highlight scala %} import org.apache.flink.contrib.streaming.DataStreamUtils import scala.collection.JavaConverters.asScalaIteratorConverter

val myResult: DataStream[(String, Int)] = ... val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala {% endhighlight %}

{% top %}