title: “Flink DataStream API Programming Guide” is_beta: false

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for example write the data to files, or to standard output (for example the command line terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines.

In order to create your own Flink DataStream program, we encourage you to start with the program skeleton and gradually add your own transformations. The remaining sections act as references for additional operations and advanced features.

  • This will be replaced by the TOC {:toc}

Example Program

The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally.

{% highlight java %} public class WindowWordCount {

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .keyBy(0)
            .timeWindow(Time.of(5, TimeUnit.SECONDS))
            .sum(1);

    dataStream.print();

    env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
        for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

} {% endhighlight %}

object WindowWordCount { def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)

val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .keyBy(0)
  .timeWindow(Time.of(5, TimeUnit.SECONDS))
  .sum(1)

counts.print

env.execute("Window Stream WordCount")

} } {% endhighlight %}

To run the example program, start the input stream with netcat first from a terminal:

nc -lk 9999

Just type some words hitting return for a new word. These will be the input to the word count program. If you want to see counts greater than 1, type the same word again and again within 5 seconds (increase the window size from 5 seconds if you cannot type that fast ☺).

Back to top

Linking with Flink

To write programs with Flink, you need to include the Flink DataStream library corresponding to your programming language in your project.

The simplest way to do this is to use one of the quickstart scripts: either for [Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They create a blank project from a template (a Maven Archetype), which sets up everything for you. To manually create the project, you can use the archetype and create a project by calling:

The archetypes are working for stable releases and preview versions (-SNAPSHOT).

If you want to add Flink to an existing Maven project, add the following entry to your dependencies section in the pom.xml file of your project:

In order to create your own Flink program, we encourage you to start with the program skeleton and gradually add your own transformations.

Back to top

Program Skeleton

As presented in the example, Flink DataStream programs look like regular Java programs with a main() method. Each program consists of the same basic parts:

  1. Obtaining a StreamExecutionEnvironment,
  2. Connecting to data stream sources,
  3. Specifying transformations on the data streams,
  4. Specifying output for the processed data,
  5. Executing the program.

We will now give an overview of each of those steps, please refer to the respective sections for more details.

The StreamExecutionEnvironment is the basis for all Flink DataStream programs. You can obtain one using these static methods on class StreamExecutionEnvironment:

{% highlight java %} getExecutionEnvironment()

createLocalEnvironment() createLocalEnvironment(int parallelism) createLocalEnvironment(int parallelism, Configuration customConfiguration)

createRemoteEnvironment(String host, int port, String... jarFiles) createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles) {% endhighlight %}

Typically, you only need to use getExecutionEnvironment(), since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line or the web interface, the Flink cluster manager will execute your main method and getExecutionEnvironment() will return an execution environment for executing your program on a cluster.

For specifying data sources the execution environment has several methods to read from files, sockets, and external systems using various methods. To just read data from a socket (useful also for debugging), you can use:

{% highlight java %} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream lines = env.socketTextStream(“localhost”, 9999) {% endhighlight %}

This will give you a DataStream on which you can then apply transformations. For more information on data sources and input formats, please refer to Data Sources.

Once you have a DataStream you can apply transformations to create a new DataStream which you can then write to a socket, transform again, combine with other DataStreams, or push to an external system (e.g., a message queue, or a file system). You apply transformations by calling methods on DataStream with your own custom transformation functions. For example, a map transformation looks like this:

{% highlight java %} DataStream input = ...;

DataStream intValues = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } }); {% endhighlight %}

This will create a new DataStream by converting every String in the original stream to an Integer. For more information and a list of all the transformations, please refer to Transformations.

Once you have a DataStream containing your final results, you can push the result to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, or print it.

{% highlight java %} writeAsText(String path, ...) writeAsCsv(String path, ...) writeToSocket(String hostname, int port, ...)

print()

addSink(...) {% endhighlight %}

Once you specified the complete program you need to trigger the program execution by calling execute() on StreamExecutionEnvironment. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.

{% highlight java %} env.execute(); {% endhighlight %}

As presented in the example, Flink DataStream programs look like regular Scala programs with a main() method. Each program consists of the same basic parts:

  1. Obtaining a StreamExecutionEnvironment,
  2. Connecting to data stream sources,
  3. Specifying transformations on the data streams,
  4. Specifying output for the processed data,
  5. Executing the program.

We will now give an overview of each of those steps, please refer to the respective sections for more details.

The StreamExecutionEnvironment is the basis for all Flink DataStream programs. You can obtain one using these static methods on class StreamExecutionEnvironment:

{% highlight scala %} def getExecutionEnvironment

def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors())

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*) def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*) {% endhighlight %}

Typically, you only need to use getExecutionEnvironment, since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from you program, and invoke it through the command line or the web interface, the Flink cluster manager will execute your main method and getExecutionEnvironment() will return an execution environment for executing your program on a cluster.

For specifying data sources the execution environment has several methods to read from files, sockets, and external systems using various methods. To just read data from a socket (useful also for debugginf), you can use:

{% highlight scala %} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment

DataStream lines = env.socketTextStream(“localhost”, 9999) {% endhighlight %}

This will give you a DataStream on which you can then apply transformations. For more information on data sources and input formats, please refer to Data Sources.

Once you have a DataStream you can apply transformations to create a new DataStream which you can then write to a file, transform again, combine with other DataStreams, or push to an external system. You apply transformations by calling methods on DataStream with your own custom transformation function. For example, a map transformation looks like this:

{% highlight scala %} val input: DataStream[String] = ...

val mapped = input.map { x => x.toInt } {% endhighlight %}

This will create a new DataStream by converting every String in the original set to an Integer. For more information and a list of all the transformations, please refer to Transformations.

Once you have a DataStream containing your final results, you can push the result to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, or print it.

{% highlight scala %} writeAsText(path: String, ...) writeAsCsv(path: String, ...) writeToSocket(hostname: String, port: Int, ...)

print()

addSink(...) {% endhighlight %}

Once you specified the complete program you need to trigger the program execution by calling execute on StreamExecutionEnvironment. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.

{% highlight scala %} env.execute() {% endhighlight %}

Back to top

DataStream Abstraction

A DataStream is a possibly unbounded immutable collection of data items of a the same type.

Transformations may return different subtypes of DataStream allowing specialized transformations. For example the keyBy(…) method returns a KeyedDataStream which is a stream of data that is logically partitioned by a certain key, and can be further windowed.

Back to top

Lazy Evaluation

All Flink DataStream programs are executed lazily: When the program‘s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to the program’s plan. The operations are actually executed when the execution is explicitly triggered by an execute() call on the StreamExecutionEnvironment object. Whether the program is executed locally or on a cluster depends on the type of StreamExecutionEnvironment.

The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.

Back to top

Transformations

Data transformations transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated topologies.

This section gives a description of all the available transformations.

    <tr>
      <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
      <td>
        <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
{% highlight java %}

dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } }); {% endhighlight %} Filter
DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: {% highlight java %} dataStream.filter(new FilterFunction() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); {% endhighlight %} KeyBy
DataStream → KeyedStream Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream. {% highlight java %} dataStream.keyBy(“someKey”) // Key by field “someKey” dataStream.keyBy(0) // Key by the first element of a Tuple {% endhighlight %} Reduce
KeyedStream → DataStream A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums: {% highlight java %} keyedStream.reduce(new ReduceFunction() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); {% endhighlight %} Fold
DataStream → DataStream A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that creates a stream of partial sums: {% highlight java %} keyedStream.fold(0, new ReduceFunction() { @Override public Integer fold(Integer accumulator, Integer value) throws Exception { return accumulator + value; } }); {% endhighlight %} Aggregations
KeyedStream → DataStream Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight java %} keyedStream.sum(0); keyedStream.sum(“key”); keyedStream.min(0); keyedStream.min(“key”); keyedStream.max(0); keyedStream.max(“key”); keyedStream.minBy(0); keyedStream.minBy(“key”); keyedStream.maxBy(0); keyedStream.maxBy(“key”); {% endhighlight %} Window
KeyedStream → WindowedStream Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. {% highlight java %} dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)); // Last 5 seconds of data {% endhighlight %} WindowAll
DataStream → AllWindowedStream Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator. {% highlight java %} dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data {% endhighlight %} Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead. {% highlight java %} windowedStream.apply (new WindowFunction<Tuple2<String,Integer>,Integer>, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }; {% endhighlight %} Window Reduce
WindowedStream → DataStream Applies a functional reduce function to the window and returns the reduced value. {% highlight java %} windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } }; {% endhighlight %} Window Fold
WindowedStream → DataStream Applies a functional fold function to the window and returns the folded value. {% highlight java %} windowedStream.fold (new Tuple2<String,Integer>(“Sum of all”, 0), new FoldFunction<Tuple2<String,Integer>() { public Tuple2<String, Integer> fold(Tuple2<String, Integer> acc, Tuple2<String, Integer> value) throws Exception { return new Tuple2<String,Integer>(acc.f0, acc.f1 + value.f1); } }; {% endhighlight %} Aggregations on windows
WindowedStream → DataStream Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight java %} windowedStream.sum(0); windowedStream.sum(“key”); windowedStream.min(0); windowedStream.min(“key”); windowedStream.max(0); windowedStream.max(“key”); windowedStream.minBy(0); windowedStream.minBy(“key”); windowedStream.maxBy(0); windowedStream.maxBy(“key”); {% endhighlight %} Union
DataStream* → DataStream Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will still only get each element once. {% highlight java %} dataStream.union(otherStream1, otherStream2, ...); {% endhighlight %} Window Join
DataStream,DataStream → DataStream Join two data streams on a given key and a common window. {% highlight java %} dataStream.join(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply (new JoinFunction () {...}); {% endhighlight %} Window CoGroup
DataStream,DataStream → DataStream Cogroups two data streams on a given key and a common window. {% highlight java %} dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply (new CoGroupFunction () {...}); {% endhighlight %} Connect
DataStream,DataStream → ConnectedStreams “Connects” two data streams retaining their types. Connect allowing for shared state between the two streams. {% highlight java %} DataStream someStream = //... DataStream otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); {% endhighlight %} CoMap, CoFlatMap
ConnectedStreams → DataStream Similar to map and flatMap on a connected data stream {% highlight java %} connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; }

@Override
public Boolean map2(String value) {
    return false;
}

}); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

@Override public void flatMap1(Integer value, Collector out) { out.collect(value.toString()); }

@Override public void flatMap2(String value, Collector out) { for (String word: value.split(" ")) { out.collect(word); } } }); {% endhighlight %} Split
DataStream → SplitStream Split the stream into two or more streams according to some criterion. {% highlight java %} SplitStream split = someDataStream.split(new OutputSelector() { @Override public Iterable select(Integer value) { List output = new ArrayList(); if (value % 2 == 0) { output.add(“even”); } else { output.add(“odd”); } return output; } }); {% endhighlight %} Select
SplitStream → DataStream Select one or more streams from a split stream. {% highlight java %} SplitStream split; DataStream even = split.select(“even”); DataStream odd = split.select(“odd”); DataStream all = split.select(“even”,“odd”); {% endhighlight %} Iterate
DataStream → IterativeStream → DataStream Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. {% highlight java %} IterativeStream iteration = initialStream.iterate(); DataStream iterationBody = iteration.map (/do something/); DataStream feedback = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream output = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } }); {% endhighlight %} Extract Timestamps
DataStream → DataStream Extracts timestamps from records in order to work with windows that use event time semantics. See working with time. {% highlight java %} stream.assignTimestamps (new TimeStampExtractor() {...}); {% endhighlight %}

    <tr>
      <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
      <td>
        <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
{% highlight scala %}

dataStream.flatMap { str => str.split(" ") } {% endhighlight %} Filter
DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: {% highlight scala %} dataStream.filter { _ != 0 } {% endhighlight %} KeyBy
DataStream → KeyedStream Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream. {% highlight scala %} dataStream.keyBy(“someKey”) // Key by field “someKey” dataStream.keyBy(0) // Key by the first element of a Tuple {% endhighlight %} Reduce
KeyedStream → DataStream A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums: {% highlight scala %} keyedStream.reduce { _ + _ } {% endhighlight %} Fold
DataStream → DataStream A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that creates a stream of partial sums: {% highlight scala %} keyedStream.fold { 0, _ + _ } {% endhighlight %} Aggregations
KeyedStream → DataStream Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight scala %} keyedStream.sum(0) keyedStream.sum(“key”) keyedStream.min(0) keyedStream.min(“key”) keyedStream.max(0) keyedStream.max(“key”) keyedStream.minBy(0) keyedStream.minBy(“key”) keyedStream.maxBy(0) keyedStream.maxBy(“key”) {% endhighlight %} Window
KeyedStream → WindowedStream Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows. {% highlight scala %} dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)) // Last 5 seconds of data // Last 5 seconds of data {% endhighlight %} WindowAll
DataStream → AllWindowedStream Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator. {% highlight scala %} dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // Last 5 seconds of data {% endhighlight %} Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead. {% highlight scala %} windowedStream.apply { applyFunction } {% endhighlight %} Window Reduce
WindowedStream → DataStream Applies a functional reduce function to the window and returns the reduced value. {% highlight scala %} windowedStream.reduce { _ + _ } {% endhighlight %} Window Fold
WindowedStream → DataStream Applies a functional fold function to the window and returns the folded value. {% highlight java %} windowedStream.fold { 0, _ + _ } {% endhighlight %} Aggregations on windows
WindowedStream → DataStream Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). {% highlight scala %} windowedStream.sum(0) windowedStream.sum(“key”) windowedStream.min(0) windowedStream.min(“key”) windowedStream.max(0) windowedStream.max(“key”) windowedStream.minBy(0) windowedStream.minBy(“key”) windowedStream.maxBy(0) windowedStream.maxBy(“key”) {% endhighlight %} Union
DataStream* → DataStream Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will still only get each element once. {% highlight scala %} dataStream.union(otherStream1, otherStream2, ...) {% endhighlight %} Window Join
DataStream,DataStream → DataStream Join two data streams on a given key and a common window. {% highlight scala %} dataStream.join(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply { ... } {% endhighlight %} Window CoGroup
DataStream,DataStream → DataStream Cogroups two data streams on a given key and a common window. {% highlight scala %} dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply {} {% endhighlight %} Connect
DataStream,DataStream → ConnectedStreams “Connects” two data streams retaining their types, allowing for shared state between the two streams. {% highlight scala %} someStream : DataStream[Int] = ... otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream) {% endhighlight %} CoMap, CoFlatMap
ConnectedStreams → DataStream Similar to map and flatMap on a connected data stream {% highlight scala %} connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false ) {% endhighlight %} Split
DataStream → SplitStream Split the stream into two or more streams according to some criterion. {% highlight scala %} val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List(“even”) case 1 => List(“odd”) } ) {% endhighlight %} Select
SplitStream → DataStream Select one or more streams from a split stream. {% highlight scala %}

val even = split select “even” val odd = split select “odd” val all = split.select(“even”,“odd”) {% endhighlight %} Iterate
DataStream → IterativeStream → DataStream Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. {% highlight java %} initialStream. iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } } IterativeStream iteration = initialStream.iterate(); DataStream iterationBody = iteration.map (/do something/); DataStream feedback = iterationBody.filter ( _ > 0); iteration.closeWith(feedback); {% endhighlight %} Extract Timestamps
DataStream → DataStream Extracts timestamps from records in order to work with windows that use event time semantics. See working with time. {% highlight scala %} stream.assignTimestamps { timestampExtractor } {% endhighlight %}

The following transformations are available on data streams of Tuples:

Physical partitioning

Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, via the following functions.

Task chaining and resource groups

Chaining two subsequent transformations means col-locating them within the same thread for better performance. Flink by default chains operators if this is possible (e.g., two subsequent map transformations). The API gives fine-grained control over chaining if desired:

Use StreamExecutionEnvironment.disableOperatorChaining() if you want to disable chaining in the whole job. For more fine grained control, the following functions are available. Note that these functions can only be used right after a DataStream transformation as they refer to the previous transformation. For example, you can use someStream.map(...).startNewChain(), but you cannot use someStream.startNewChain().

A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.

Back to top

Specifying Keys

The keyBy transformation requires that a key is defined on its argument DataStream.

A DataStream is keyed as {% highlight java %} DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/define key here/) .window(/define window here/); {% endhighlight %}

The data model of Flink is not based on key-value pairs. Therefore, you do not need to physically pack the data stream types into keys and values. Keys are “virtual”: they are defined as functions over the actual data to guide the grouping operator.

See the relevant section of the DataSet API documentation on how to specify keys. Just replace DataSet with DataStream, and groupBy with keyBy.

Passing Functions to Flink

Some transformations take user-defined functions as arguments.

See the relevant section of the DataSet API documentation.

Back to top

Data Types

Flink places some restrictions on the type of elements that are used in DataStreams and in results of transformations. The reason for this is that the system analyzes the types to determine efficient execution strategies.

See the relevant section of the DataSet API documentation.

Back to top

Data Sources

Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction). You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.

  • readFileStream - create a stream by appending elements when there are changes to a file

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

Collection-based:

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(T ...) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...)). See connectors for more details.

Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction). You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.

  • readFileStream - create a stream by appending elements when there are changes to a file

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

Collection-based:

  • fromCollection(Seq) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(elements: _*) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...)). See connectors for more details.

Back to top

Execution Configuration

The StreamExecutionEnvironment also contains the ExecutionConfig which allows to set job specific configuration values for the runtime.

See the relevant section of the DataSet API documentation.

Parameters in the ExecutionConfig that pertain specifically to the DataStream API are:

  • enableTimestamps() / disableTimestamps(): Attach a timestamp to each event emitted from a source. areTimestampsEnabled() returns the current value.

  • setAutoWatermarkInterval(long milliseconds): Set the interval for automatic watermark emission. You can get the current value with long getAutoWatermarkInterval()

Back to top

Data Sinks

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • write() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • write() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Back to top

Debugging

Before running a streaming program in a distributed cluster, it is a good idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis programs is usually an incremental process of checking results, debugging, and improving.

Flink provides features to significantly ease the development process of data analysis programs by supporting local debugging from within an IDE, injection of test data, and collection of result data. This section give some hints how to ease the development of Flink programs.

Local Execution Environment

A LocalStreamEnvironment starts a Flink system within the same JVM process it was created in. If you start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your program.

A LocalEnvironment is created and used as follows:

DataStream lines = env.addSource(/* some source */); // build your program

env.execute(); {% endhighlight %}

{% highlight scala %} val env = StreamExecutionEnvironment.createLocalEnvironment()

val lines = env.addSource(/* some source */) // build your program

env.execute() {% endhighlight %}

Collection Data Sources

Flink provides special data sources which are backed by Java collections to ease testing. Once a program has been tested, the sources and sinks can be easily replaced by sources and sinks that read from / write to external systems.

Collection data sources can be used as follows:

// Create a DataStream from a list of elements DataStream myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection List<Tuple2<String, Integer>> data = ... DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator Iterator longIt = ... DataStream myLongs = env.fromCollection(longIt, Long.class); {% endhighlight %}

// Create a DataStream from a list of elements val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataStream from any Collection val data: Seq[(String, Int)] = ... val myTuples = env.fromCollection(data)

// Create a DataStream from an Iterator val longIt: Iterator[Long] = ... val myLongs = env.fromCollection(longIt) {% endhighlight %}

Note: Currently, the collection data source requires that data types and iterators implement Serializable. Furthermore, collection data sources can not be executed in parallel ( parallelism = 1).

Back to top

Windows

Working with Time

Windows are typically groups of events within a certain time period. Reasoning about time and windows assumes a definition of time. Flink has support for three kinds of time:

  • Processing time: Processing time is simply the wall clock time of the machine that happens to be executing the transformation. Processing time is the simplest notion of time and provides the best performance. However, in distributed and asynchronous environments processing time does not provide determinism.

  • Event time: Event time is the time that each individual event occurred. This time is typically embedded within the records before they enter Flink or can be extracted from their contents. When using event time, out-of-order events can be properly handled. For example, an event with a lower timestamp may arrive after an event with a higher timestamp, but transformations will handle these events correctly. Event time processing provides predictable results, but incurs more latency, as out-of-order events need to be buffered

  • Ingestion time: Ingestion time is the time that events enter Flink. In particular, the timestamp of an event is assigned by the source operator as the current wall clock time of the machine that executes the source task at the time the records enter the Flink source. Ingestion time is more predictable than processing time, and gives lower latencies than event time as the latency does not depend on external systems. Ingestion time provides thus a middle ground between processing time and event time. Ingestion time is a special case of event time (and indeed, it is treated by Flink identically to event time).

When dealing with event time, transformations need to avoid indefinite wait times for events to arrive. Watermarks provide the mechanism to control the event time-processing time skew. Watermarks are emitted by the sources. A watermark with a certain timestamp denotes the knowledge that no event with timestamp lower than the timestamp of the watermark will ever arrive.

You can specify the semantics of time in a Flink DataStream program using StreamExecutionEnviroment, as

The default value is TimeCharacteristic.ProcessingTime, so in order to write a program with processing time semantics nothing needs to be specified (e.g., the first example in this guide follows processing time semantics).

In order to work with event time semantics, you need to follow four steps:

  • Set env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  • Use DataStream.assignTimestamps(...) in order to tell Flink how timestamps relate to events (e.g., which record field is the timestamp)

  • Set enableTimestamps(), as well the interval for watermark emission (setAutoWatermarkInterval(long milliseconds)) in ExecutionConfig.

For example, assume that we have a data stream of tuples, in which the first field is the timestamp (assigned by the system that generates these data streams), and we know that the lag between the current processing time and the timestamp of an event is never more than 1 second:

@Override
public long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {
    return element.f0 - 1000;
}

@Override
public long getCurrentWatermark() {
    return Long.MIN_VALUE;
}

}); {% endhighlight %}

override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 - 1000

override def getCurrentWatermark: Long = Long.MinValue }) {% endhighlight %}

If you know that timestamps of events are always ascending, i.e., elements arrive in order, you can use the AscendingTimestampExtractor, and the system generates watermarks automatically:

In order to write a program with ingestion time semantics, you need to set env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime). You can think of this setting as a shortcut for writing a TimestampExtractor which assignes timestamps to events at the sources based on the current source wall-clock time. Flink injects this timestamp extractor automatically.

Windows on Keyed Data Streams

Flink offers a variety of methods for defining windows on a KeyedStream. All of these group elements per key, i.e., each window will contain elements with the same key value.

Basic Window Constructs

Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows for common use cases. See first if your use case can be served by the pre-defined windows below before moving to defining your own windows.

Advanced Window Constructs

The general mechanism can define more powerful windows at the cost of more verbose syntax. For example, below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second, but the execution of the window function is triggered when 100 elements have been added to the window, and every time execution is triggered, 10 elements are retained in the window:

The general recipe for building a custom window is to specify (1) a WindowAssigner, (2) a Trigger (optionally), and (3) an Evictor (optionally).

The WindowAssigner defines how incoming elements are assigned to windows. A window is a logical group of elements that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according to some notion of time described above within these values are part of the window).

For example, the SlidingTimeWindows assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that time starts from 0 and is measured in milliseconds. Then, we have 6 windows that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your own window types by extending the WindowAssigner class.

The Trigger specifies when the function that comes after the window clause (e.g., sum, count) is evaluated (“fires”) for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the definition of the WindowAssigner). Flink comes bundled with a set of triggers if the ones that windows use by default do not fit the application. You can write your own trigger by implementing the Trigger interface. Note that specifying a trigger will override the default trigger of the window assigner.

After the trigger fires, and before the function (e.g., sum, count) is applied to the window contents, an optional Evictor removes some elements from the beginning of the window before the remaining elements are passed on to the function. Flink comes bundled with a set of evictors You can write your own evictor by implementing the Evictor interface.

Recipes for Building Windows

The mechanism of window assigner, trigger, and evictor is very powerful, and it allows you to define many different kinds of windows. Flink's basic window constructs are, in fact, syntactic sugar on top of the general mechanism. Below is how some common types of windows can be constructed using the general mechanism

Windows on Unkeyed Data Streams

You can also define windows on regular (non-keyed) data streams using the windowAll transformation. These windowed data streams have all the capabilities of keyed windowed data streams, but are evaluated at a single task (and hence at a single computing node). The syntax for defining triggers and evictors is exactly the same:

Basic window definitions are also available for windows on non-keyed streams:

Back to top

Execution Parameters

Fault Tolerance

Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a persistent or durable source that can be asked for prior records again (Apache Kafka is a good example of a durable source).

The checkpointing mechanism stores the progress in the source as well as the user-defined state (see Working with State) consistently to provide exactly once processing guarantees.

To enable checkpointing, call enableCheckpointing(n) on the StreamExecutionEnvironment, where n is the checkpoint interval in milliseconds.

Other parameters for checkpointing include:

  • Number of retries: The setNumberOfExecutionRerties() method defines how many times the job is restarted after a failure. When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
  • exactly-once vs. at-least-once: You can optionally pass a mode to the enableCheckpointing(n) method to choose between the two guarantee levels. Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.

The docs on streaming fault tolerance describe in detail the technique behind Flink's streaming fault tolerance mechanism.

Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:

To guarantee end-to-end exactly-once record delivery (in addition to exactly-once updates), the data sink needs to take part in the snapshotting mechanism. The following table lists the delivery guarantees (assuming exactly-once state updates) of Flink coupled with bundled sinks:

Parallelism

You can control the number of parallel instances created for each operator by calling the operator.setParallelism(int) method.

Controlling Latency

By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. To control throughput and latency, you can use env.setBufferTimeout(timeoutMillis) on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.

Usage:

env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); {% endhighlight %}

env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis) {% endhighlight %}

To maximize throughput, set setBufferTimeout(-1) which will remove the timeout and buffers will only be flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.

Back to top

Working with State

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators. You can make every transformation (map, filter, etc) stateful by declaring local variables or using Flink‘s state interface. You can register any local variable as managed state by implementing an interface. In this case, and also in the case of using Flink’s native state interface, Flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.

The end effect is that updates to any form of state are the same under failure-free execution and execution under failures.

First, we look at how to make local variables consistent under failures, and then we look at Flink's state interface.

Making Local Variables Consistent

Local variables can be made consistent by using the Checkpointed interface.

When the user defined function implements the Checkpointed interface, the snapshotState(…) and restoreState(…) methods will be executed to draw and restore function state.

In addition to that, user functions can also implement the CheckpointNotifier interface to receive notifications on completed checkpoints via the notifyCheckpointComplete(long checkpointId) method. Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications.

For example the same counting, reduce function shown for OperatorStates by using the Checkpointed interface instead:

{% highlight java %} public class CounterSum implements ReduceFunction, Checkpointed {

//persistent counter
private long counter = 0;

@Override
public Long reduce(Long value1, Long value2) throws Exception {
    counter++;
    return value1 + value2;
}

// regularly persists state during normal operation
@Override
public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
    return counter;
}

// restores state on recovery from failure
@Override
public void restoreState(Long state) {
    counter = state;
}

} {% endhighlight %}

Using the State Interface

Flink supports two types of operator states: partitioned and non-partitioned states.

In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When OperatorState.value() is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, value() will return the number of inputs processed by each parallel mapper.

In case of of partitioned operator state a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.

Checkpointing of the states needs to be enabled from the StreamExecutionEnvironment using the enableCheckpointing(…) where additional parameters can be passed to modify the default 5 second checkpoint interval.

Operator states can be accessed from the RuntimeContext using the getOperatorState(“name”, defaultValue, partitioned) method so it is only accessible in RichFunctions. A recommended usage pattern is to retrieve the operator state in the open(…) method of the operator and set it as a field in the operator instance for runtime usage. Multiple OperatorStates can be used simultaneously by the same operator by using different names to identify them.

Partitioned operator state is only supported on KeyedStreams.

By default operator states are checkpointed using default java serialization thus they need to be Serializable. The user can gain more control over the state checkpoint mechanism by passing a StateCheckpointer instance when retrieving the OperatorState from the RuntimeContext. The StateCheckpointer allows custom implementations for the checkpointing logic for increased efficiency and to store arbitrary non-serializable states.

By default state checkpoints will be stored in-memory at the JobManager. Flink also supports storing the checkpoints on Flink-supported file system which can be set in the flink-conf.yaml. Note that the state backend must be accessible from the JobManager, use file:// only for local setups.

For example let us write a reduce function that besides summing the data it also counts have many elements it has seen.

{% highlight java %} public class CounterSum implements RichReduceFunction {

//persistent counter
private OperatorState<Long> counter;

@Override
public Long reduce(Long value1, Long value2) throws Exception {
    counter.update(counter.value() + 1);
    return value1 + value2;
}

@Override
public void open(Configuration config) {
    counter = getRuntimeContext().getOperatorState(“counter”, 0L, false);
}

} {% endhighlight %}

Stateful sources require a bit more care as opposed to other operators they are not data driven, but their run(SourceContext) methods potentially run infinitely. In order to make the updates to the state and output collection atomic the user is required to get a lock from the source's context.

{% highlight java %} public static class CounterSource implements RichParallelSourceFunction {

// utility for job cancellation
private volatile boolean isRunning = false;

// maintain the current offset for exactly once semantics
private OperatorState<Long> offset;

@Override
public void run(SourceContext<Long> ctx) throws Exception {
    isRunning = true;
    Object lock = ctx.getCheckpointLock();
    
    while (isRunning) {
        // output and state update are atomic
        synchronized (lock){
            ctx.collect(offset);
            offset.update(offset.value() + 1);
        }
    }
}

@Override
public void open(Configuration config) {
    offset = getRuntimeContext().getOperatorState(“offset”, 0L);
}

@Override
public void cancel() {
    isRunning = false;
}

} {% endhighlight %}

Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the flink.streaming.api.checkpoint.CheckpointNotifier interface.

State Checkpoints in Iterative Jobs

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

Back to top

Iterations

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example using filters. First, we define an IterativeStream

{% highlight java %} IterativeStream iteration = input.iterate(); {% endhighlight %}

Then, we specify the logic that will be executed inside the loop using a series of trasformations (here a simple map transformation)

{% highlight java %} DataStream iterationBody = iteration.map(/* this is executed many times */); {% endhighlight %}

To close an iteration and define the iteration tail, call the closeWith(feedbackStream) method of the IterativeStream. The DataStream given to the closeWith function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the strem that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the “termination” logic, where an element is allowed to propagate downstream rather than being fed back.

{% highlight java %} iteration.closeWith(tail.filter(iterationBody.filter(/* one part of the stream /))); DataStream output = iterationBody.filter(/ some other part of the stream */); {% endhighlight %}

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

{% highlight java %} DataStream someIntegers = env.generateSequence(0, 1000);

IterativeStream iteration = someIntegers.iterate();

DataStream minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return value - 1 ; } });

DataStream stillGreaterThanZero = minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value > 0); } });

iteration.closeWith(stillGreaterThanZero);

DataStream lessThanZero = minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } }); {% endhighlight %}

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example iteration where the body (the part of the computation that is repeated) is a simple map transformation, and the elements that are fed back are distinguished by the elements that are forwarded downstream using filters.

{% highlight scala %} val iteratedStream = someDataStream.iterate( iteration => { val iterationBody = iteration.map(/* this is executed many times /) (tail.filter(/ one part of the stream /), tail.filter(/ some other part of the stream */)) }) {% endhighlight %}

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

{% highlight scala %} val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate( iteration => { val minusOne = iteration.map( v => v - 1) val stillGreaterThanZero = minusOne.filter (_ > 0) val lessThanZero = minusOne.filter(_ <= 0) (stillGreaterThanZero, lessThanZero) } ) {% endhighlight %}

Back to top

Connectors

Connectors provide code for interfacing with various third-party systems.

Currently these systems are supported:

To run an application using one of these connectors, additional third party components are usually required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. Docker containers are also provided encapsulating these services to aid users getting started with connectors.

Apache Kafka

This connector provides access to event streams served by Apache Kafka.

Flink provides special Kafka Connectors for reading and writing data to Kafka topics. The Flink Kafka Consumer integrates with Flink's checkpointing mechanisms to provide different processing guarantees (most importantly exactly-once guarantees).

For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers. The Kafka consumer might commit offsets to Kafka which have not been processed successfully.

Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the flink-connector-kafka-083 package and the FlinkKafkaConsumer082 class are appropriate.

Then, import the connector in your maven project:

{% highlight xml %} org.apache.flink flink-connector-kafka {{site.version }} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here.

Installing Apache Kafka

  • Follow the instructions from Kafka's quickstart to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
  • On 32 bit computers this problem may occur.
  • If the Kafka and Zookeeper servers are running on a remote machine, then the advertised.host.name setting in the config/server.properties file the must be set to the machine's IP address.

Kafka Consumer

The standard FlinkKafkaConsumer082 is a Kafka consumer providing access to one topic.

The following parameters have to be provided for the FlinkKafkaConsumer082(...) constructor:

  1. The topic name
  2. A DeserializationSchema
  3. Properties for the Kafka consumer. The following properties are required:
  • “bootstrap.servers” (comma separated list of Kafka brokers)
  • “zookeeper.connect” (comma separated list of Zookeeper servers)
  • “group.id” the id of the consumer group

Example:

Kafka Consumers and Fault Tolerance

As Kafka persists all the data, a fault tolerant Kafka consumer can be provided.

The FlinkKafkaConsumer082 can read a topic, and if the job fails for some reason, the source will continue on reading from where it left off after a restart. For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.

To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.

Kafka Sink

A class providing an interface for sending data to Kafka.

The following arguments have to be provided for the KafkaSink(…) constructor in order:

  1. Broker address (in hostname:port format, can be a comma separated list)
  2. The topic name
  3. Serialization schema

Example:

The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:

If this constructor is used, the user needs to make sure to set the broker(s) with the “metadata.broker.list” property. Also the serializer configuration should be left default, and the serialization should be set via SerializationSchema.

The Apache Kafka official documentation can be found here.

Back to top

Elasticsearch

This connector provides a Sink that can write to an Elasticsearch Index. To use this connector, add the following dependency to your project:

{% highlight xml %} org.apache.flink flink-connector-elasticsearch {{site.version }} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Installing Elasticsearch

Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating a Sink for writing to your cluster

Elasticsearch Sink

The connector provides a Sink that can send data to an Elasticsearch Index.

The sink can use two different methods for communicating with Elasticsearch:

  1. An embedded Node
  2. The TransportClient

See here for information about the differences between the two modes.

This code shows how to create a sink that uses an embedded Node for communication:

Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put(“bulk.flush.max.actions”, “1”); config.put(“cluster.name”, “my-cluster-name”);

input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder() { @Override public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { Map<String, Object> json = new HashMap<>(); json.put(“data”, element);

    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
}

})); {% endhighlight %}

val config = new util.HashMap[String, String] config.put(“bulk.flush.max.actions”, “1”) config.put(“cluster.name”, “my-cluster-name”)

text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] { override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { val json = new util.HashMap[String, AnyRef] json.put(“data”, element) println("SENDING: " + element) Requests.indexRequest.index(“my-index”).type(“my-type”).source(json) } })) {% endhighlight %}

Note how a Map of Strings is used to configure the Sink. The configuration keys are documented in the Elasticsearch documentation here. Especially important is the cluster.name parameter that must correspond to the name of your cluster.

Internally, the sink uses a BulkProcessor to send index requests to the cluster. This will buffer elements before sending a request to the cluster. The behaviour of the BulkProcessor can be configured using these config keys:

  • bulk.flush.max.actions: Maximum amount of elements to buffer
  • bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer
  • bulk.flush.interval.ms: Interval at which to flush data regardless of the other two settings in milliseconds

This example code does the same, but with a TransportClient:

Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put(“bulk.flush.max.actions”, “1”); config.put(“cluster.name”, “my-cluster-name”);

List transports = new ArrayList(); transports.add(new InetSocketTransportAddress(“node-1”, 9300)); transports.add(new InetSocketTransportAddress(“node-2”, 9300));

input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder() { @Override public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { Map<String, Object> json = new HashMap<>(); json.put(“data”, element);

    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
}

})); {% endhighlight %}

val config = new util.HashMap[String, String] config.put(“bulk.flush.max.actions”, “1”) config.put(“cluster.name”, “my-cluster-name”)

val transports = new ArrayList[String] transports.add(new InetSocketTransportAddress(“node-1”, 9300)) transports.add(new InetSocketTransportAddress(“node-2”, 9300))

text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] { override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { val json = new util.HashMap[String, AnyRef] json.put(“data”, element) println("SENDING: " + element) Requests.indexRequest.index(“my-index”).type(“my-type”).source(json) } })) {% endhighlight %}

The difference is that we now need to provide a list of Elasticsearch Nodes to which the sink should connect using a TransportClient.

More about information about Elasticsearch can be found here.

Back to top

Hadoop FileSystem

This connector provides a Sink that writes rolling files to any filesystem supported by Hadoop FileSystem. To use this connector, add the following dependency to your project:

{% highlight xml %} org.apache.flink flink-connector-filesystem {{site.version}} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Rolling File Sink

The rolling behaviour as well as the writing can be configured but we will get to that later. This is how you can create a default rolling sink:

input.addSink(new RollingSink(“/base/path”));

{% endhighlight %}

input.addSink(new RollingSink(“/base/path”))

{% endhighlight %}

The only required parameter is the base path where the rolling files (buckets) will be stored. The sink can be configured by specifying a custom bucketer, writer and batch size.

By default the rolling sink will use the pattern "yyyy-MM-dd--HH" to name the rolling buckets. This pattern is passed to SimpleDateFormat with the current system time to form a bucket path. A new bucket will be created whenever the bucket path changes. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: Each parallel instance of the sink will create its own part file and when part files get too big the sink will also create a new part file next to the others. To specify a custom bucketer use setBucketer() on a RollingSink.

The default writer is StringWriter. This will call toString() on the incoming elements and write them to part files, separated by newline. To specify a custom writer use setWriter() on a RollingSink. If you want to write Hadoop SequenceFiles you can use the provided SequenceFileWriter which can also be configured to use compression.

The last configuration option is the batch size. This specifies when a part file should be closed and a new one started. (The default part file size is 384 MB).

Example:

RollingSink sink = new RollingSink(“/base/path”); sink.setBucketer(new DateTimeBucketer(“yyyy-MM-dd--HHmm”)); sink.setWriter(new SequenceFileWriter<IntWritable, Text>()); sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

input.addSink(sink);

{% endhighlight %}

val sink = new RollingSinkString sink.setBucketer(new DateTimeBucketer(“yyyy-MM-dd--HHmm”)) sink.setWriter(new SequenceFileWriterIntWritable, Text) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,

input.addSink(sink)

{% endhighlight %}

This will create a sink that writes to bucket files that follow this schema:

/base/path/{date-time}/part-{parallel-task}-{count}

Where date-time is the string that we get from the date/time format, parallel-task is the index of the parallel sink instance and count is the running number of part files that where created because of the batch size.

For in-depth information, please refer to the JavaDoc for RollingSink.

Back to top

RabbitMQ

This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:

{% highlight xml %} org.apache.flink flink-connector-rabbitmq {{site.version }} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Installing RabbitMQ

Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.

RabbitMQ Source

A class providing an interface for receiving data from RabbitMQ.

The followings have to be provided for the RMQSource(…) constructor in order:

  1. The hostname
  2. The queue name
  3. Deserialization schema

Example:

RabbitMQ Sink

A class providing an interface for sending data to RabbitMQ.

The followings have to be provided for the RMQSink(…) constructor in order:

  1. The hostname
  2. The queue name
  3. Serialization schema

Example:

More about RabbitMQ can be found here.

Back to top

Twitter Streaming API

Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in TwitterSource class for establishing a connection to this stream. To use this connector, add the following dependency to your project:

{% highlight xml %} org.apache.flink flink-connector-twitter {{site.version }} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Authentication

In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.

Acquiring the authentication information

First of all, a Twitter account is needed. Sign up for free at twitter.com/signup or sign in at Twitter's Application Management and register the application by clicking on the “Create New App” button. Fill out a form about your program and accept the Terms and Conditions. After selecting the application, the API key and API secret (called consumerKey and sonsumerSecret in TwitterSource respectively) is located on the “API Keys” tab. The necessary access token data (token and secret) can be acquired here. Remember to keep these pieces of information secret and do not push them to public repositories.

Accessing the authentication information

Create a properties file, and pass its path in the constructor of TwitterSource. The content of the file should be similar to this:

#properties file for my app
secret=***
consumerSecret=***
token=***-***
consumerKey=***

Constructors

The TwitterSource class has two constructors.

  1. public TwitterSource(String authPath, int numberOfTweets); to emit finite number of tweets
  2. public TwitterSource(String authPath); for streaming

Both constructors expect a String authPath argument determining the location of the properties file containing the authentication information. In the first case, numberOfTweets determines how many tweet the source emits.

Usage

In contrast to other connectors, the TwitterSource depends on no additional services. For example the following code should run gracefully:

The TwitterSource emits strings containing a JSON code. To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation JSONParseFlatMap abstract class among the examples. JSONParseFlatMap is an extension of the FlatMapFunction and has a

function which can be use to acquire the value of a given field.

There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.

Example

TwitterLocal is an example how to use TwitterSource. It implements a language frequency counter program.

Back to top

Docker containers for connectors

A Docker container is provided with all the required configurations for test running the connectors of Apache Flink. The servers for the message queues will be running on the docker container while the example topology can be run on the user's computer.

Installing Docker

The official Docker installation guide can be found here. After installing Docker an image can be pulled for each connector. Containers can be started from these images where all the required configurations are set.

Creating a jar with all the dependencies

For the easiest setup, create a jar with all the dependencies of the flink-streaming-connectors project.

cd /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors
mvn assembly:assembly
~~~bash

This creates an assembly jar under *flink-streaming-connectors/target*.

#### RabbitMQ
Pull the docker image:

~~~bash
sudo docker pull flinkstreaming/flink-connectors-rabbitmq

To run the container, type:

sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq

Now a terminal has started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost‘s and the Docker container’s ports so RabbitMQ can communicate with the application through these.

To start the RabbitMQ server:

sudo /etc/init.d/rabbitmq-server start

To launch the example on the host computer, execute:

java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology \
> log.txt 2> errorlog.txt

There are two connectors in the example. One that sends messages to RabbitMQ, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:

<DATE> INFO rabbitmq.RMQTopology: String: <one> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <two> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <three> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <four> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <five> arrived from RMQ

Apache Kafka

Pull the image:

sudo docker pull flinkstreaming/flink-connectors-kafka

To run the container type:

sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i \
flinkstreaming/flink-connectors-kafka

Now a terminal has started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost‘s and the Docker container’s ports so Kafka can communicate with the application through these. First start a zookeeper in the background:

/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties \
> zookeeperlog.txt &

Then start the kafka server in the background:

/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties \
 > serverlog.txt 2> servererr.txt &

To launch the example on the host computer execute:

java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology \
> log.txt 2> errorlog.txt

In the example there are two connectors. One that sends messages to Kafka, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:

<DATE> INFO kafka.KafkaTopology: String: (0) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (1) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (2) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (3) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (4) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (5) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (6) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (7) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (8) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (9) arrived from Kafka

Back to top

Program Packaging & Distributed Execution

See the relevant section of the DataSet API documentation.

Back to top

Parallel Execution

See the relevant section of the DataSet API documentation.

Back to top

Execution Plans

See the relevant section of the DataSet API documentation.

Back to top