blob: d87ee9c243aefe90f5d3a7398bce007fa36bda29 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.10-SNAPSHOT Documentation: Flink Stream Processing API</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/master/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/master/index.html">Overview<span class="hidden-sm hidden-xs"> 0.10</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/setup/building.html">Get Flink 0.10-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/master/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/master/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Flink Stream Processing API <span class="badge">Beta</span></h1>
<p><a href="#top"></a></p>
<p>Flink Streaming is a system for high-throughput, low-latency data stream processing. Flink Streaming natively supports <a href="#stateful-computation">stateful computation</a>, data-driven <a href="#window-operators">windowing semantics</a> and <a href="#iterations">iterative</a> stream processing. The system can connect to and process data streams from different data sources like file sources, web sockets, message queues (Apache Kafka, RabbitMQ, Twitter Streaming API …), and also from any user defined data sources. Data streams can be transformed and modified to create new data streams using high-level functions similar to the ones provided by the batch processing API.</p>
<ul id="markdown-toc">
<li><a href="#flink-streaming-api" id="markdown-toc-flink-streaming-api">Flink Streaming API</a></li>
<li><a href="#example-program" id="markdown-toc-example-program">Example Program</a></li>
<li><a href="#program-skeleton" id="markdown-toc-program-skeleton">Program Skeleton</a></li>
<li><a href="#basics" id="markdown-toc-basics">Basics</a> <ul>
<li><a href="#datastream" id="markdown-toc-datastream">DataStream</a></li>
<li><a href="#object-reuse-behavior" id="markdown-toc-object-reuse-behavior">Object Reuse Behavior</a></li>
<li><a href="#partitioning" id="markdown-toc-partitioning">Partitioning</a></li>
<li><a href="#connecting-to-the-outside-world" id="markdown-toc-connecting-to-the-outside-world">Connecting to the outside world</a></li>
</ul>
</li>
<li><a href="#transformations" id="markdown-toc-transformations">Transformations</a> <ul>
<li><a href="#basic-transformations" id="markdown-toc-basic-transformations">Basic transformations</a></li>
<li><a href="#grouped-operators" id="markdown-toc-grouped-operators">Grouped operators</a></li>
<li><a href="#aggregations" id="markdown-toc-aggregations">Aggregations</a></li>
<li><a href="#window-operators" id="markdown-toc-window-operators">Window operators</a></li>
<li><a href="#temporal-database-style-operators" id="markdown-toc-temporal-database-style-operators">Temporal database style operators</a></li>
<li><a href="#co-operators" id="markdown-toc-co-operators">Co operators</a></li>
<li><a href="#output-splitting" id="markdown-toc-output-splitting">Output splitting</a></li>
<li><a href="#iterations" id="markdown-toc-iterations">Iterations</a></li>
<li><a href="#rich-functions" id="markdown-toc-rich-functions">Rich functions</a></li>
</ul>
</li>
<li><a href="#stateful-computation" id="markdown-toc-stateful-computation">Stateful computation</a> <ul>
<li><a href="#operatorstate" id="markdown-toc-operatorstate">OperatorState</a></li>
<li><a href="#checkpointed-interface" id="markdown-toc-checkpointed-interface">Checkpointed interface</a></li>
<li><a href="#state-checkpoints-in-iterative-jobs" id="markdown-toc-state-checkpoints-in-iterative-jobs">State checkpoints in iterative jobs</a></li>
</ul>
</li>
<li><a href="#lambda-expressions-with-java-8" id="markdown-toc-lambda-expressions-with-java-8">Lambda expressions with Java 8</a></li>
<li><a href="#operator-settings" id="markdown-toc-operator-settings">Operator Settings</a> <ul>
<li><a href="#parallelism" id="markdown-toc-parallelism">Parallelism</a></li>
<li><a href="#buffer-timeout" id="markdown-toc-buffer-timeout">Buffer timeout</a></li>
</ul>
</li>
<li><a href="#stream-connectors" id="markdown-toc-stream-connectors">Stream connectors</a> <ul>
<li><a href="#apache-kafka" id="markdown-toc-apache-kafka">Apache Kafka</a></li>
<li><a href="#rabbitmq" id="markdown-toc-rabbitmq">RabbitMQ</a></li>
<li><a href="#twitter-streaming-api" id="markdown-toc-twitter-streaming-api">Twitter Streaming API</a></li>
<li><a href="#docker-containers-for-connectors" id="markdown-toc-docker-containers-for-connectors">Docker containers for connectors</a></li>
</ul>
</li>
</ul>
<h2 id="flink-streaming-api">Flink Streaming API</h2>
<p>The Streaming API is currently part of the <em>flink-staging</em> Maven project. All relevant classes are located in the <em>org.apache.flink.streaming</em> package.</p>
<p>Add the following dependency to your <code>pom.xml</code> to use the Flink Streaming.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-streaming-core<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-clients<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-streaming-scala<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-clients<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
</div>
</div>
<p>In order to create your own Flink Streaming program, we encourage you to start with the <a href="#program-skeleton">skeleton</a> and gradually add your own <a href="#transformations">transformations</a>. The remaining sections act as references for additional transformations and advanced features.</p>
<h2 id="example-program">Example Program</h2>
<p>The following program is a complete, working example of streaming WordCount, that incrementally counts the words coming from a web socket. You can copy &amp; paste the code to run it locally.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">StreamingWordCount</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span>
<span class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">dataStream</span> <span class="o">=</span> <span class="n">env</span>
<span class="o">.</span><span class="na">socketTextStream</span><span class="o">(</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="mi">9999</span><span class="o">)</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Splitter</span><span class="o">())</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">dataStream</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Socket Stream WordCount&quot;</span><span class="o">);</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Splitter</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">sentence</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="nl">word:</span> <span class="n">sentence</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">object</span> <span class="nc">WordCount</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span>
<span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="k">val</span> <span class="n">text</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="mi">9999</span><span class="o">)</span>
<span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">text</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">toLowerCase</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)</span> <span class="n">filter</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">nonEmpty</span> <span class="o">}</span> <span class="o">}</span>
<span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span>
<span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="n">counts</span><span class="o">.</span><span class="n">print</span>
<span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="s">&quot;Scala Socket Stream WordCount&quot;</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<p>To run the example program, start the input stream with netcat first from a terminal:</p>
<div class="highlight"><pre><code class="language-bash">nc -lk 9999</code></pre></div>
<p>The lines typed to this terminal will be the source data stream for your streaming job.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="program-skeleton">Program Skeleton</h2>
<div class="codetabs">
<div data-lang="java">
<p>As presented in the <a href="#example-program">example</a>, a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:</p>
<ol>
<li>Obtaining a <code>StreamExecutionEnvironment</code>,</li>
<li>Connecting to data stream sources,</li>
<li>Specifying transformations on the data streams,</li>
<li>Specifying output for the processed data,</li>
<li>Executing the program.</li>
</ol>
<p>As these steps are basically the same as in the batch API, we will only note the important differences.
For stream processing jobs, the user needs to obtain a <code>StreamExecutionEnvironment</code> in contrast with the <a href="programming_guide.html#program-skeleton">batch API</a> where one would need an <code>ExecutionEnvironment</code>. Otherwise, the process is essentially the same:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">createLocalEnvironment</span><span class="o">(</span><span class="n">parallelism</span><span class="o">);</span>
<span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">createRemoteEnvironment</span><span class="o">(</span><span class="n">String</span> <span class="n">host</span><span class="o">,</span> <span class="kt">int</span> <span class="n">port</span><span class="o">,</span> <span class="kt">int</span> <span class="n">parallelism</span><span class="o">,</span> <span class="n">String</span><span class="o">...</span> <span class="n">jarFiles</span><span class="o">);</span></code></pre></div>
<p>For connecting to data streams the <code>StreamExecutionEnvironment</code> has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the <a href="#basics">basics</a> section.</p>
<p>For example:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">env</span><span class="o">.</span><span class="na">socketTextStream</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span>
<span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span><span class="n">elements</span><span class="err"></span><span class="o">);</span>
<span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="n">sourceFunction</span><span class="o">)</span></code></pre></div>
<p>After defining the data stream sources the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the <a href="#transformations">transformations</a> section.</p>
<p>For example:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">mapFunction</span><span class="o">).</span><span class="na">reduce</span><span class="o">(</span><span class="n">reduceFunction</span><span class="o">);</span></code></pre></div>
<p>The processed data can be pushed to different outputs called sinks. The user can define their own sinks or use any predefined filesystem, message queue or database sink.</p>
<p>For example:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">writeAsCsv</span><span class="o">(</span><span class="n">path</span><span class="o">);</span>
<span class="n">dataStream</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">dataStream</span><span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="n">sinkFunction</span><span class="o">)</span></code></pre></div>
<p>Once the complete program is specified <code>execute(programName)</code> is to be called on the <code>StreamExecutionEnvironment</code>. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="n">programName</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<p>As presented in the <a href="#example-program">example</a> a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:</p>
<ol>
<li>Obtaining a <code>StreamExecutionEnvironment</code>,</li>
<li>Connecting to data stream sources,</li>
<li>Specifying transformations on the data streams,</li>
<li>Specifying output for the processed data,</li>
<li>Executing the program.</li>
</ol>
<p>As these steps are basically the same as in the batch API we will only note the important differences.
For stream processing jobs, the user needs to obtain a <code>StreamExecutionEnvironment</code> in contrast with the <a href="programming_guide.html#program-skeleton">batch API</a> where one would need an <code>ExecutionEnvironment</code>. The process otherwise is essentially the same:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">createLocalEnvironment</span><span class="o">(</span><span class="n">parallelism</span><span class="o">)</span>
<span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">createRemoteEnvironment</span><span class="o">(</span><span class="n">host</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">port</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">parallelism</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">jarFiles</span><span class="k">:</span> <span class="kt">String*</span><span class="o">)</span></code></pre></div>
<p>For connecting to data streams the <code>StreamExecutionEnvironment</code> has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the <a href="#basics">basics</a> section.</p>
<p>For example:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">env</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">)</span>
<span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span><span class="n">elements</span><span class="err"></span><span class="o">)</span>
<span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">sourceFunction</span><span class="o">)</span></code></pre></div>
<p>After defining the data stream sources the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the <a href="#transformations">transformations</a> section.</p>
<p>For example:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">mapFunction</span><span class="o">).</span><span class="n">reduce</span><span class="o">(</span><span class="n">reduceFunction</span><span class="o">)</span></code></pre></div>
<p>The processed data can be pushed to different outputs called sinks. The user can define their own sinks or use any predefined filesystem, message queue or database sink.</p>
<p>For example:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">writeAsCsv</span><span class="o">(</span><span class="n">path</span><span class="o">)</span>
<span class="n">dataStream</span><span class="o">.</span><span class="n">print</span>
<span class="n">dataStream</span><span class="o">.</span><span class="n">addSink</span><span class="o">(</span><span class="n">sinkFunction</span><span class="o">)</span></code></pre></div>
<p>Once the complete program is specified <code>execute(programName)</code> is to be called on the <code>StreamExecutionEnvironment</code>. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="n">programName</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p><a href="#top">Back to top</a></p>
<h2 id="basics">Basics</h2>
<h3 id="datastream">DataStream</h3>
<p>The <code>DataStream</code> is the basic data abstraction provided by Flink Streaming. It represents a continuous, parallel, immutable stream of data of a certain type. By applying transformations the user can create new data streams or output the results of the computations. For instance the map transformation creates a new <code>DataStream</code> by applying a user defined function on each element of a given <code>DataStream</code></p>
<p>The transformations may return different data stream types allowing more elaborate transformations, for example the <code>groupBy(…)</code> method returns a <code>GroupedDataStream</code> which can be used for grouped transformations such as aggregating by key. We will discover more elaborate data stream types in the upcoming sections.</p>
<h3 id="object-reuse-behavior">Object Reuse Behavior</h3>
<p>Apache Flink is trying to reduce the number of object allocations for better performance.</p>
<p>By default, user defined functions (like <code>map()</code> or <code>reduce()</code>) are getting new objects on each call
(or through an iterator). So it is possible to keep references to the objects inside the function
(for example in a List).</p>
<p>There is a switch at the <code>ExectionConfig</code> which allows users to enable the object reuse mode:</p>
<p><code>
env.getExecutionConfig().enableObjectReuse()
</code></p>
<p>For mutable types, Flink will reuse object
instances. In practice that means that a <code>map()</code> function will always receive the same object
instance (with its fields set to new values). The object reuse mode will lead to better performance
because fewer objects are created, but the user has to manually take care of what they are doing
with the object references.</p>
<h3 id="partitioning">Partitioning</h3>
<p>Partitioning controls how individual data points of a stream are distributed among the parallel instances of the transformation operators. This also controls the ordering of the records in the <code>DataStream</code>. There is partial ordering guarantee for the outputs with respect to the partitioning scheme (outputs produced from each partition are guaranteed to arrive in the order they were produced).</p>
<p>There are several partitioning types supported in Flink Streaming:</p>
<ul>
<li><em>Forward (default)</em>: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. If there are more processing nodes than inputs or vice versa the load is distributed among the extra nodes in a round-robin fashion. This is the default partitioner.
Usage: <code>dataStream.forward()</code></li>
<li><em>Shuffle</em>: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution. Use this only when it is important that the partitioning is randomised. If you only care about an even load use <em>Rebalance</em>.
Usage: <code>dataStream.shuffle()</code></li>
<li><em>Rebalance</em>: Rebalance partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
Usage: <code>dataStream.rebalance()</code></li>
<li><em>Field/Key Partitioning</em>: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance.
Usage: <code>dataStream.partitionByHash(fields…)</code></li>
<li><em>Field/Key Grouping</em>: Field/Key grouping takes partitioning one step further and seperates the elements to disjoint groups based on the hash code. These groups are processed separately by the next downstream operator.
Usage: <code>dataStream.groupBy(fields…)</code></li>
<li><em>Broadcast</em>: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
Usage: <code>dataStream.broadcast()</code></li>
<li><em>Global</em>: All data points are directed to the first instance of the operator.
Usage: <code>dataStream.global()</code></li>
</ul>
<p>By default <em>Forward</em> partitioning is used.</p>
<p>Partitioning does not remain in effect after a transformation, so it needs to be set again for subsequent operations.</p>
<h3 id="connecting-to-the-outside-world">Connecting to the outside world</h3>
<p>The user is expected to connect to the outside world through the source and the sink interfaces.</p>
<h4 id="sources">Sources</h4>
<p>Sources can by created by using <code>StreamExecutionEnvironment.addSource(sourceFunction)</code>.
Either use one of the source functions that come with Flink or write a custom source
by implementing the <code>SourceFunction</code> interface. By default, sources run with
parallelism of 1. To create parallel sources the user’s source function needs to implement
<code>ParallelSourceFunction</code> or extend <code>RichParallelSourceFunction</code> in which cases the source will have
the parallelism of the environment. The parallelism for ParallelSourceFunctions can be changed
after creation by using <code>source.setParallelism(parallelism)</code>.</p>
<p>The <code>SourceFunction</code> interface has two methods: <code>run(SourceContext)</code> and <code>cancel()</code>. The <code>run()</code>
method is not expected to return until the source has either finished by itself or received
a cancel request. The source can communicate with the outside world using the source context. For
example, the <code>emit(element)</code> method is used to emit one element from the source. Most sources will
have an infinite while loop inside the <code>run()</code> method to read from the input and emit elements.
Upon invocation of the <code>cancel()</code> method the source is required to break out of its internal
loop and return from the <code>run()</code> method. A common implementation for this is the following:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">MySource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="c1">// utility for job cancellation</span>
<span class="kd">private</span> <span class="kd">volatile</span> <span class="kt">boolean</span> <span class="n">isRunning</span> <span class="o">=</span> <span class="kc">false</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">run</span><span class="o">(</span><span class="n">SourceContext</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">ctx</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">isRunning</span> <span class="o">=</span> <span class="kc">true</span><span class="o">;</span>
<span class="k">while</span> <span class="o">(</span><span class="n">isRunning</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// the source runs, isRunning flag should be checked frequently</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// invoked by the framework in case of job cancellation</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">cancel</span><span class="o">()</span> <span class="o">{</span>
<span class="n">isRunning</span> <span class="o">=</span> <span class="kc">false</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>In addition to the bounded data sources (with similar method signatures as the
<a href="programming_guide.html#data-sources">batch API</a>) there are several predefined stream sources
accessible from the <code>StreamExecutionEnvironment</code>:</p>
<ul>
<li>
<p><em>Socket text stream</em>: Creates a new <code>DataStream</code> that contains the strings received
from the given socket. Strings are decoded by the system’s default character set. The user
can optionally set the delimiters or the number of connection retries in case of errors.
Usage: <code>env.socketTextStream(hostname, port,…)</code></p>
</li>
<li>
<p><em>Text file stream</em>: Creates a new <code>DataStream</code> that contains the lines of the files created
(or modified) in a given directory. The system continuously monitors the given path, and processes
any new files or modifications based on the settings. The file will be read with the system’s
default character set.
Usage: <code>env.readFileStream(String path, long checkFrequencyMillis, WatchType watchType)</code></p>
</li>
<li>
<p><em>Message queue connectors</em>: There are pre-implemented connectors for a number of popular message
queue services, please refer to the section on <a href="#stream-connectors">connectors</a> for more details.</p>
</li>
<li>
<p><em>Custom source</em>: Creates a new <code>DataStream</code> by using a user defined <code>SourceFunction</code> implementation.
Usage: <code>env.addSource(sourceFunction)</code></p>
</li>
</ul>
<h4 id="sinks">Sinks</h4>
<p><code>DataStreamSink</code> represents the different outputs of Flink Streaming programs. The user can either define his own <code>SinkFunction</code> implementation or chose one of the available implementations (methods of <code>DataStream</code>).</p>
<p>For example:</p>
<ul>
<li><code>dataStream.print()</code> – Writes the <code>DataStream</code> to the standard output, practical for testing purposes</li>
<li><code>dataStream.writeAsText(parameters)</code> – Writes the <code>DataStream</code> to a text file</li>
<li><code>dataStream.writeAsCsv(parameters)</code> – Writes the <code>DataStream</code> to CSV format</li>
<li><code>dataStream.addSink(sinkFunction)</code> – Custom sink implementation</li>
</ul>
<p>There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on <a href="#stream-connectors">connectors</a> for more detail.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="transformations">Transformations</h2>
<p>Transformations, also called operators, represent the users’ business logic on the data stream. Operators consume data streams and produce new data streams. The user can chain and combine multiple operators on the data stream to produce the desired processing steps. Most of the operators work very similar to the batch Flink API allowing developers to reason about <code>DataStream</code> the same way as they would about <code>DataSet</code>. At the same time there are operators that exploit the streaming nature of the data to allow advanced functionality.</p>
<h3 id="basic-transformations">Basic transformations</h3>
<p>Basic transformations can be seen as functions that operate on records of the data stream.</p>
<div class="codetabs">
<div data-lang="java">
<p><br /></p>
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Map</strong></td>
<td>
<p>Takes one element and produces one element. A map that doubles the values of the input stream:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="mi">2</span> <span class="o">*</span> <span class="n">value</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>FlatMap</strong></td>
<td>
<p>Takes one element and produces zero, one, or more elements. A flatmap that splits sentences to words:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span>
<span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">for</span><span class="o">(</span><span class="n">String</span> <span class="nl">word:</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)){</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">word</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Filter</strong></td>
<td>
<p>Evaluates a boolean function for each element and retains those for which the function returns true.
<br />
<br />
A filter that filters out zero values:
</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value</span> <span class="o">!=</span> <span class="mi">0</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Reduce</strong></td>
<td>
<p>Combines a stream of elements into another stream by repeatedly combining two elements
into one and emits the current state after every reduction. Reduce may only be applied on a windowed or grouped data stream.
<br />
<strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value per window.
<br />
<br />
A reducer that sums up the incoming stream, the result is a stream of intermediate sums:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="n">ReduceFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value1</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">value2</span><span class="o">)</span>
<span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value1</span> <span class="o">+</span> <span class="n">value2</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Fold</strong></td>
<td>
<p>Combines a stream element by element with an initial aggregator value. Fold may only be applied on a windowed or grouped data stream.
<br />
A folder that appends strings one by one to the empty sting:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">fold</span><span class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">FoldFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">fold</span><span class="o">(</span><span class="n">String</span> <span class="n">accumulator</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">accumulator</span> <span class="o">+</span> <span class="n">value</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Union</strong></td>
<td>
<p>Union of two or more data streams creating a new stream containing all the elements from all the streams.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">union</span><span class="o">(</span><span class="n">otherStream1</span><span class="o">,</span> <span class="n">otherStream2</span><span class="o">,</span> <span class="err"></span><span class="o">)</span></code></pre></div>
</td>
</tr>
</tbody>
</table>
<hr />
<p>The following transformations are available on data streams of Tuples:</p>
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Project</strong></td>
<td>
<p>Selects a subset of fields from the tuples</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span><span class="mi">0</span><span class="o">);</span></code></pre></div>
</td>
</tr>
</tbody>
</table>
</div>
<div data-lang="scala">
<p><br /></p>
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Map</strong></td>
<td>
<p>Takes one element and produces one element. A map that doubles the values of the input stream:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">map</span><span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="n">x</span> <span class="o">*</span> <span class="mi">2</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>FlatMap</strong></td>
<td>
<p>Takes one element and produces zero, one, or more elements. A flatmap that splits sentences to words:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="n">str</span> <span class="k">=&gt;</span> <span class="n">str</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Filter</strong></td>
<td>
<p>Evaluates a boolean function for each element and retains those for which the function returns true.
<br />
<br />
A filter that filters out zero values:
</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">filter</span><span class="o">{</span> <span class="k">_</span> <span class="o">!=</span> <span class="mi">0</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Reduce</strong></td>
<td>
<p>Combines a stream of elements into another stream by repeatedly combining two elements
into one and emits the current state after every reduction. Reduce may only be applied on a windowed or grouped data stream.
<br />
<strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value per window.
<br />
<br />
A reducer that sums up the incoming stream, the result is a stream of intermediate sums:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">reduce</span><span class="o">{</span> <span class="k">_</span> <span class="o">+</span> <span class="k">_</span><span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Fold</strong></td>
<td>
<p>Combines a stream element by element with an initial aggregator value. Fold may only be applied windowed or grouped data stream.
<br />
A folder that appends strings one by one to the empty sting:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">fold</span><span class="o">{</span><span class="s">&quot;&quot;</span><span class="o">,</span> <span class="k">_</span> <span class="o">+</span> <span class="k">_</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Union</strong></td>
<td>
<p>Union of two or more data streams creating a new stream containing all the elements from all the streams.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">union</span><span class="o">(</span><span class="n">otherStream1</span><span class="o">,</span> <span class="n">otherStream2</span><span class="o">,</span> <span class="err"></span><span class="o">)</span></code></pre></div>
</td>
</tr>
</tbody>
</table>
</div>
</div>
<h3 id="grouped-operators">Grouped operators</h3>
<p>Some transformations require that the elements of a <code>DataStream</code> are grouped on some key. The user can create a <code>GroupedDataStream</code> by calling the <code>groupBy(key)</code> method of a non-grouped <code>DataStream</code>.
Keys can be of three types: field positions (applicable for tuple/array types), field expressions (applicable for pojo types), KeySelector instances.</p>
<p>Aggregation or reduce operators called on <code>GroupedDataStream</code>s produce elements on a per group basis.</p>
<h3 id="aggregations">Aggregations</h3>
<p>The Flink Streaming API supports different types of pre-defined aggregations of <code>GroupedDataStream</code>s and <code>WindowedDataStream</code>s. A common property of these operators, is that they produce the stream of intermediate aggregate values.</p>
<p>Types of aggregations: <code>sum(field)</code>, <code>min(field)</code>, <code>max(field)</code>, <code>minBy(field, first)</code>, <code>maxBy(field, first)</code>.</p>
<p>With <code>sum</code>, <code>min</code>, and <code>max</code> for every incoming tuple the selected field is replaced with the current aggregated value. Fields can be selected using either field positions or field expressions (similarly to grouping).</p>
<p>With <code>minBy</code> and <code>maxBy</code> the output of the operator is the element with the current minimal or maximal value at the given field. If more components share the minimum or maximum value, the user can decide if the operator should return the first or last element. This can be set by the <code>first</code> boolean parameter.</p>
<h3 id="window-operators">Window operators</h3>
<p>Flink streaming provides very flexible data-driven windowing semantics to create arbitrary windows (also referred to as discretizations or slices) of the data streams and apply reduce, map or aggregation transformations on the windows acquired. Windowing can be used for instance to create rolling aggregations of the most recent N elements, where N could be defined by Time, Count or any arbitrary user defined measure.</p>
<p>The user can control the size (eviction) of the windows and the frequency of transformation or aggregation calls (trigger) on them in an intuitive API. We will describe the exact semantics of these operators in the <a href="#policy-based-windowing">policy based windowing</a> section.</p>
<p>Some examples:</p>
<ul>
<li><code>dataStream.window(eviction).every(trigger).reduceWindow(…)</code></li>
<li><code>dataStream.window(…).every(…).mapWindow(…).flatten()</code></li>
<li><code>dataStream.window(…).every(…).groupBy(…).aggregate(…).getDiscretizedStream()</code></li>
</ul>
<p>The core abstraction of the Windowing semantics is the <code>WindowedDataStream</code> and the <code>StreamWindow</code>. The <code>WindowedDataStream</code> is created when we first call the <code>window(…)</code> method of the DataStream and represents the windowed discretisation of the underlying stream. The user can think about it simply as a <code>DataStream&lt;StreamWindow&lt;T&gt;&gt;</code> where additional API functions are supplied to provide efficient transformations of individual windows.</p>
<p>Please note at this point that the <code>.every(…)</code> call belongs together with the preceding <code>.window(…)</code> call and does not define a new transformation in itself.</p>
<p>The result of a window transformation is again a <code>WindowedDataStream</code> which can also be used to further apply other windowed computations. In this sense, window transformations define mapping from stream windows to stream windows.</p>
<p>The user has different ways of using the result of a window operation:</p>
<ul>
<li><code>windowedDataStream.flatten()</code> - streams the results element wise and returns a <code>DataStream&lt;T&gt;</code> where T is the type of the underlying windowed stream</li>
<li><code>windowedDataStream.getDiscretizedStream()</code> - returns a <code>DataStream&lt;StreamWindow&lt;T&gt;&gt;</code> for applying some advanced logic on the stream windows itself. Be careful here, as at this point, we need to materialise the full windows</li>
<li>Calling any window transformation further transforms the windows, while preserving the windowing logic</li>
</ul>
<p>The next example would create windows that hold elements of the last 5 seconds, and the user defined transformation would be executed on the windows every second (sliding the window by 1 second):</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">every</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">));</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="nc">TimeUnit</span><span class="o">.</span><span class="nc">SECONDS</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">TimeUnit</span><span class="o">.</span><span class="nc">SECONDS</span><span class="o">))</span></code></pre></div>
</div>
</div>
<p>This approach is often referred to as policy based windowing. Different policies (count, time, etc.) can be mixed as well, for example to downsample our stream, a window that takes the latest 100 elements of the stream every minute is created as follows:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Count</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">100</span><span class="o">)).</span><span class="na">every</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">));</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Count</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">100</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">TimeUnit</span><span class="o">.</span><span class="nc">MINUTES</span><span class="o">))</span></code></pre></div>
</div>
</div>
<p>The user can also omit the <code>every(…)</code> call which results in a tumbling window emptying the window after every transformation call.</p>
<p>Several predefined policies are provided in the API, including delta-based, count-based and time-based policies. These can be accessed through the static methods provided by the <code>PolicyHelper</code> classes:</p>
<ul>
<li><code>Time.of(…)</code></li>
<li><code>Count.of(…)</code></li>
<li><code>Delta.of(…)</code></li>
<li><code>FullStream.window()</code></li>
</ul>
<p>For detailed description of these policies please refer to the <a href="http://flink.apache.org/docs/latest/api/java/">Javadocs</a>.</p>
<h4 id="policy-based-windowing">Policy based windowing</h4>
<p>The policy based windowing is a highly flexible way to specify stream discretisation also called windowing semantics. Two types of policies are used for such a specification:</p>
<ul>
<li>
<p><code>TriggerPolicy</code> defines when to trigger the reduce or transformation UDF on the current window and emit the result. In the API it completes a window statement such as: <code>window(…).every(…)</code>, while the triggering policy is passed within <code>every</code>. In case the user wants to use tumbling eviction policy (the window is emptied after the transformation) he can omit the <code>.every(…)</code> call and pass the trigger policy directly to the <code>.window(…)</code> call.</p>
</li>
<li>
<p><code>EvictionPolicy</code> defines the length of a window as a means of a predicate for evicting tuples when they are no longer needed. In the API this can be defined by the <code>window(…)</code> operation on a stream. There are mostly the same predefined policy types provided as for trigger policies.</p>
</li>
</ul>
<p>Trigger and eviction policies work totally independently of each other. The eviction policy continuously maintains a window, into which it adds new elements and based on the eviction logic removes older elements in the order of arrival. The trigger policy on the other hand only decided at each new incoming element, whether it should trigger computation (and output results) on the currently maintained window.</p>
<p>Several predefined policies are provided in the API, including delta-based, punctuation based, count-based and time-based policies. Policies are in general UDFs and can implement any custom behaviour.</p>
<p>In addition to the <code>dataStream.window(…).every(…)</code> style, users can specifically pass the trigger and eviction policies during the window call:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">triggerPolicy</span><span class="o">,</span> <span class="n">evictionPolicy</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="n">triggerPolicy</span><span class="o">,</span> <span class="n">evictionPolicy</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p>By default triggers can only trigger when a new element arrives. This might not be suitable for all the use-cases with low data rates. To also provide triggering between elements, so called active policies can be used (the two interfaces controlling this special behaviour is <code>ActiveTriggerPolicy</code> and <code>CentralActiveTrigger</code>). The predefined time-based policies are already implemented in such a way and can hold as an example for user defined active policy implementations.</p>
<p>Time-based trigger and eviction policies can work with user defined <code>TimeStamp</code> implementations, these policies already cover most use cases.</p>
<h4 id="reduce-on-windowed-data-streams">Reduce on windowed data streams</h4>
<p>The <code>WindowedDataStream&lt;T&gt;.reduceWindow(ReduceFunction&lt;T&gt;)</code> transformation calls the user-defined <code>ReduceFunction</code> at every trigger on the records currently in the window. The user can also use the different pre-implemented streaming aggregations such as <code>sum, min, max, minBy</code> and <code>maxBy</code>.</p>
<p>The following is an example for a window reduce that sums the elements in the last minute with 10 seconds slide interval:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">)).</span><span class="na">every</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="n">field</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">TimeUnit</span><span class="o">.</span><span class="nc">MINUTES</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span><span class="nc">TimeUnit</span><span class="o">.</span><span class="nc">SECONDS</span><span class="o">)).</span><span class="n">sum</span><span class="o">(</span><span class="n">field</span><span class="o">)</span></code></pre></div>
</div>
</div>
<h4 id="map-on-windowed-data-streams">Map on windowed data streams</h4>
<p>The <code>WindowedDataStream&lt;T&gt;.mapWindow(WindowMapFunction&lt;T,O&gt;)</code> transformation calls <code>mapWindow(…)</code> for each <code>StreamWindow</code> in the discretised stream, providing access to all elements in the window through the iterable interface. At each function call the output <code>StreamWindow&lt;O&gt;</code> will consist of all the elements collected to the collector. This allows a straightforward way of mapping one stream window to another.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">windowedDataStream</span><span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="n">windowMapFunction</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">windowedDataStream</span><span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">windowMapFunction</span><span class="o">)</span></code></pre></div>
</div>
</div>
<h4 id="grouped-transformations-on-windowed-data-streams">Grouped transformations on windowed data streams</h4>
<p>Calling the <code>groupBy(…)</code> method on a windowed stream groups the elements by the given fields inside the stream windows. The window sizes (evictions) and slide sizes (triggers) will be calculated on the whole stream (in a global fashion), but the user defined functions will be applied on a per group basis inside the window. This means that for a call <code>windowedStream.groupBy(…).reduceWindow(…)</code> will transform each window into another window consisting of as many elements as keys in the original window, with the reduced values per key. Similarly the <code>mapWindow</code> transformation is applied per group as well.</p>
<p>The user can also create discretisation on a per group basis calling <code>window(…).every(…)</code> on an already grouped data stream. This will apply the discretisation logic independently for each key.</p>
<p>To highlight the differences let us look at two examples.</p>
<p>To get the maximal value for each key on the last 100 elements (global) we use the first approach:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Count</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">100</span><span class="o">)).</span><span class="na">every</span><span class="o">(</span><span class="err"></span><span class="o">).</span><span class="na">groupBy</span><span class="o">(</span><span class="n">groupingField</span><span class="o">).</span><span class="na">max</span><span class="o">(</span><span class="n">field</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Count</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">100</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="err"></span><span class="o">).</span><span class="n">groupBy</span><span class="o">(</span><span class="n">groupingField</span><span class="o">).</span><span class="n">max</span><span class="o">(</span><span class="n">field</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p>Using this approach we took the last 100 elements, divided it into groups by key, and then applied the aggregation. To create fixed size windows for every key, we need to bring the groupBy call before the window call. So to take the max for the last 100 elements in each group:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="n">groupingField</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Count</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">100</span><span class="o">)).</span><span class="na">every</span><span class="o">(</span><span class="err"></span><span class="o">).</span><span class="na">max</span><span class="o">(</span><span class="n">field</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="n">groupingField</span><span class="o">).</span><span class="n">window</span><span class="o">(</span><span class="nc">Count</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">100</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="err"></span><span class="o">).</span><span class="n">max</span><span class="o">(</span><span class="n">field</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p>This will create separate windows for different keys and apply the trigger and eviction policies on a per group basis.</p>
<h4 id="applying-multiple-transformations-on-a-window">Applying multiple transformations on a window</h4>
<p>Using the <code>WindowedDataStream</code> abstraction we can apply several transformations one after another on the discretised streams without having to re-discretise it:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Count</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">1000</span><span class="o">)).</span><span class="na">groupBy</span><span class="o">(</span><span class="n">firstKey</span><span class="o">).</span><span class="na">mapWindow</span><span class="o">(</span><span class="err"></span><span class="o">)</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="n">secondKey</span><span class="o">).</span><span class="na">reduceWindow</span><span class="o">(</span><span class="err"></span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Count</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">1000</span><span class="o">)).</span><span class="n">groupBy</span><span class="o">(</span><span class="n">firstKey</span><span class="o">).</span><span class="n">mapWindow</span><span class="o">(</span><span class="err"></span><span class="o">)</span>
<span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="n">secondKey</span><span class="o">).</span><span class="n">reduceWindow</span><span class="o">(</span><span class="err"></span><span class="o">).</span><span class="n">flatten</span><span class="o">()</span></code></pre></div>
</div>
</div>
<p>The above call would create global windows of 1000 elements, group them by the first key, and then apply a mapWindow transformation. The resulting windowed stream will then be grouped by the second key and further reduced. The results of the reduce transformation are then flattened.</p>
<p>Notice that here we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (<code>groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)</code>) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements, but the key idea is that each transformation still corresponds to the same 1000 elements in the original stream.</p>
<h4 id="periodic-aggregations-on-the-full-stream-history">Periodic aggregations on the full stream history</h4>
<p>Sometimes it is necessary to aggregate over all the previously seen data in the stream. For this purpose either use the <code>dataStream.window(FullStream.window()).every(trigger)</code> or equivalently <code>dataStream.every(trigger)</code>.</p>
<h4 id="global-vs-local-discretisation">Global vs local discretisation</h4>
<p>By default all window discretisation calls (<code>dataStream.window(…)</code>) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a parallelism of 1 to be able to correctly execute the discretisation logic.</p>
<p>Sometimes it is sufficient to create local discretisations, which allows the discretiser to run in parallel and apply the given discretisation logic at every discretiser instance. To allow local discretisation use the <code>local()</code> method of the windowed data stream.</p>
<p>For example, <code>dataStream.window(Count.of(100)).maxBy(field)</code> would create global windows of 100 elements (Count discretises with parallelism of 1) and return the record with the max value by the selected field; alternatively the <code>dataStream.window(Count.of(100)).local().maxBy(field)</code> would create several count discretisers (as defined by the environment parallelism) and compute the max values accordingly.</p>
<h3 id="temporal-database-style-operators">Temporal database style operators</h3>
<p>While database style operators like joins (on key) and crosses are hard to define properly on data streams, a straightforward interpretation is to apply these operators on windows of the data streams.</p>
<p>Currently join and cross operators are supported only on time windows. We are working on alleviating this limitation in the next release.</p>
<p>Temporal operators take the current windows of both streams and apply the join/cross logic on these window pairs.</p>
<p>The Join transformation produces a new Tuple DataStream with two fields. Each tuple holds a joined element of the first input DataStream in the first tuple field and a matching element of the second input DataStream in the second field for the current window. The user can also supply a custom join function to control the produced elements.</p>
<p>The following code shows a default Join transformation using field position keys:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream1</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">dataStream2</span><span class="o">)</span>
<span class="o">.</span><span class="na">onWindow</span><span class="o">(</span><span class="n">windowing_params</span><span class="o">)</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="n">key_in_first</span><span class="o">)</span>
<span class="o">.</span><span class="na">equalTo</span><span class="o">(</span><span class="n">key_in_second</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream1</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">dataStream2</span><span class="o">)</span>
<span class="o">.</span><span class="n">onWindow</span><span class="o">(</span><span class="n">windowing_params</span><span class="o">)</span>
<span class="o">.</span><span class="n">where</span><span class="o">(</span><span class="n">key_in_first</span><span class="o">)</span>
<span class="o">.</span><span class="n">equalTo</span><span class="o">(</span><span class="n">key_in_second</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p>The Cross transformation combines two <code>DataStream</code>s into one <code>DataStream</code>. It builds all pairwise combinations of the elements of both input DataStreams in the current window, i.e., it builds a temporal Cartesian product. The user can also supply a custom cross function to control the produced elements</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream1</span><span class="o">.</span><span class="na">cross</span><span class="o">(</span><span class="n">dataStream2</span><span class="o">).</span><span class="na">onWindow</span><span class="o">(</span><span class="n">windowing_params</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream1</span> <span class="n">cross</span> <span class="n">dataStream2</span> <span class="n">onWindow</span> <span class="o">(</span><span class="n">windowing_params</span><span class="o">)</span></code></pre></div>
</div>
</div>
<h3 id="co-operators">Co operators</h3>
<p>Co operators allow the users to jointly transform two <code>DataStream</code>s of different types, providing a simple way to jointly manipulate streams with a shared state. It is designed to support joint stream transformations where union is not appropriate due to different data types, or in case the user needs explicit tracking of the origin of individual elements.
Co operators can be applied to <code>ConnectedDataStream</code>s which represent two <code>DataStream</code>s of possibly different types. A <code>ConnectedDataStream</code> can be created by calling the <code>connect(otherDataStream)</code> method of a <code>DataStream</code>.</p>
<h4 id="map-on-connecteddatastream">Map on ConnectedDataStream</h4>
<p>Applies a CoMap transformation on two separate DataStreams, mapping them to a common output type. The transformation calls a <code>CoMapFunction.map1()</code> for each element of the first input and <code>CoMapFunction.map2()</code> for each element of the second input. Each CoMapFunction call returns exactly one element.
A CoMap operator that outputs true if an Integer value is received and false if a String value is received:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">dataStream1</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">dataStream2</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">dataStream1</span><span class="o">.</span><span class="na">connect</span><span class="o">(</span><span class="n">dataStream2</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">CoMapFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Boolean</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Boolean</span> <span class="nf">map1</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="kc">true</span><span class="o">;</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Boolean</span> <span class="nf">map2</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="kc">false</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">})</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">dataStream1</span> <span class="k">:</span> <span class="kt">DataStream</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">val</span> <span class="n">dataStream2</span> <span class="k">:</span> <span class="kt">DataStream</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="o">(</span><span class="n">dataStream1</span> <span class="n">connect</span> <span class="n">dataStream2</span><span class="o">)</span>
<span class="o">.</span><span class="n">map</span><span class="o">(</span>
<span class="o">(</span><span class="k">_</span> <span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="kc">true</span><span class="o">,</span>
<span class="o">(</span><span class="k">_</span> <span class="k">:</span> <span class="kt">String</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="kc">false</span>
<span class="o">)</span></code></pre></div>
</div>
</div>
<h4 id="flatmap-on-connecteddatastream">FlatMap on ConnectedDataStream</h4>
<p>The FlatMap operator for the <code>ConnectedDataStream</code> works similarly to CoMap, but instead of returning exactly one element after each map call the user can output arbitrarily many values using the Collector interface.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">dataStream1</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">dataStream2</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">dataStream1</span><span class="o">.</span><span class="na">connect</span><span class="o">(</span><span class="n">dataStream2</span><span class="o">)</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">CoFlatMapFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap1</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toString</span><span class="o">());</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap2</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="nl">word:</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">word</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">})</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">dataStream1</span> <span class="k">:</span> <span class="kt">DataStream</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">val</span> <span class="n">dataStream2</span> <span class="k">:</span> <span class="kt">DataStream</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="o">(</span><span class="n">dataStream1</span> <span class="n">connect</span> <span class="n">dataStream2</span><span class="o">)</span>
<span class="o">.</span><span class="n">flatMap</span><span class="o">(</span>
<span class="o">(</span><span class="n">num</span> <span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">List</span><span class="o">(</span><span class="n">num</span><span class="o">.</span><span class="n">toString</span><span class="o">),</span>
<span class="o">(</span><span class="n">str</span> <span class="k">:</span> <span class="kt">String</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">str</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)</span>
<span class="o">)</span></code></pre></div>
</div>
</div>
<h4 id="windowreduce-on-connecteddatastream">WindowReduce on ConnectedDataStream</h4>
<p>The windowReduce operator applies a user defined <code>CoWindowFunction</code> to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.</p>
<h4 id="reduce-on-connecteddatastream">Reduce on ConnectedDataStream</h4>
<p>The Reduce operator for the <code>ConnectedDataStream</code> applies a group-reduce transformation on the grouped joined data streams and then maps the reduced elements to a common output type. It works only for connected data streams where the inputs are grouped.</p>
<h3 id="output-splitting">Output splitting</h3>
<div class="codetabs">
<div data-lang="java">
<p>Most data stream operators support directed outputs (output splitting), meaning that different output elements are sent only to specific outputs. The outputs are referenced by their name given at the point of receiving:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">SplitDataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">split</span> <span class="o">=</span> <span class="n">someDataStream</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="n">outputSelector</span><span class="o">);</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">even</span> <span class="o">=</span> <span class="n">split</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">&quot;even&quot;</span><span class="o">);</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">odd</span> <span class="o">=</span> <span class="n">split</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">&quot;odd&quot;</span><span class="o">);</span></code></pre></div>
<p>In the above example the data stream named “even” will only contain elements that are directed to the output named “even”. The user can of course further transform these new streams by for example squaring only the even elements.</p>
<p>Data streams only receive the elements directed to selected output names. The user can also select multiple output names by <code>splitStream.select(“output1”, “output2”, …)</code>. It is common that a stream listens to all the outputs, so <code>split.selectAll()</code> provides this functionality without having to select all names.</p>
<p>The outputs of an operator are directed by implementing a selector function (implementing the <code>OutputSelector</code> interface):</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="nf">select</span><span class="o">(</span><span class="n">OUT</span> <span class="n">value</span><span class="o">);</span></code></pre></div>
<p>The data is sent to all the outputs returned in the iterable (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent.</p>
<p>For example to split even and odd numbers:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="nd">@Override</span>
<span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="nf">select</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">outputs</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;();</span>
<span class="k">if</span> <span class="o">(</span><span class="n">value</span> <span class="o">%</span> <span class="mi">2</span> <span class="o">==</span> <span class="mi">0</span><span class="o">)</span> <span class="o">{</span>
<span class="n">outputs</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="s">&quot;even&quot;</span><span class="o">);</span>
<span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
<span class="n">outputs</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="s">&quot;odd&quot;</span><span class="o">);</span>
<span class="o">}</span>
<span class="k">return</span> <span class="n">outputs</span><span class="o">;</span>
<span class="o">}</span></code></pre></div>
<p>Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.</p>
<p>The functionality provided by output splitting can also be achieved efficiently (due to operator chaining) by multiple filter operators.</p>
</div>
<div data-lang="scala">
<p>Most data stream operators support directed outputs (output splitting), meaning that different output elements are sent only to specific outputs. The outputs are referenced by their name given at the point of receiving:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">split</span> <span class="k">=</span> <span class="n">someDataStream</span><span class="o">.</span><span class="n">split</span><span class="o">(</span>
<span class="o">(</span><span class="n">num</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="o">(</span><span class="n">num</span> <span class="o">%</span> <span class="mi">2</span><span class="o">)</span> <span class="k">match</span> <span class="o">{</span>
<span class="k">case</span> <span class="mi">0</span> <span class="k">=&gt;</span> <span class="nc">List</span><span class="o">(</span><span class="s">&quot;even&quot;</span><span class="o">)</span>
<span class="k">case</span> <span class="mi">1</span> <span class="k">=&gt;</span> <span class="nc">List</span><span class="o">(</span><span class="s">&quot;odd&quot;</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">)</span>
<span class="k">val</span> <span class="n">even</span> <span class="k">=</span> <span class="n">split</span> <span class="n">select</span> <span class="s">&quot;even&quot;</span>
<span class="k">val</span> <span class="n">odd</span> <span class="k">=</span> <span class="n">split</span> <span class="n">select</span> <span class="s">&quot;odd&quot;</span></code></pre></div>
<p>In the above example the data stream named “even” will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements.</p>
<p>Data streams only receive the elements directed to selected output names. The user can also select multiple output names by <code>splitStream.select(“output1”, “output2”, …)</code>. It is common that a stream listens to all the outputs, so <code>split.selectAll</code> provides this functionality without having to select all names.</p>
<p>The outputs of an operator are directed by implementing a function that returns the output names for the value. The data is sent to all the outputs returned by the function (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent.</p>
<p>Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.</p>
<p>The functionality provided by output splitting can also be achieved efficiently (due to operator chaining) by multiple filter operators.</p>
</div>
</div>
<h3 id="iterations">Iterations</h3>
<div class="codetabs">
<div data-lang="java">
<p>The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the batch Flink API. Iterative streaming programs also implement a step function and embed it into an <code>IterativeDataStream</code>.
Unlike in the batch API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail using <a href="#output-splitting">output splitting</a> or <a href="#filter">filters</a>.
To start an iterative part of the program the user defines the iteration starting point:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">IterativeDataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">iteration</span> <span class="o">=</span> <span class="n">source</span><span class="o">.</span><span class="na">iterate</span><span class="o">(</span><span class="n">maxWaitTimeMillis</span><span class="o">);</span></code></pre></div>
<p>The operator applied on the iteration starting point is the head of the iteration, where data is fed back from the iteration tail.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">head</span> <span class="o">=</span> <span class="n">iteration</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">IterationHead</span><span class="o">());</span></code></pre></div>
<p>To close an iteration and define the iteration tail, the user calls <code>closeWith(feedbackStream)</code> method of the <code>IterativeDataStream</code>. This iteration tail (the DataStream given to the <code>closeWith</code> function) will be fed back to the iteration head. A common pattern is to use <a href="#filter">filters</a> to separate the output of the iteration from the feedback-stream.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">tail</span> <span class="o">=</span> <span class="n">head</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">IterationTail</span><span class="o">());</span>
<span class="n">iteration</span><span class="o">.</span><span class="na">closeWith</span><span class="o">(</span><span class="n">tail</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">isFeedback</span><span class="o">));</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">tail</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">isOutput</span><span class="o">);</span></code></pre></div>
<p>In this case, all values passing the <code>isFeedback</code> filter will be fed back to the iteration head, and the values passing the <code>isOutput</code> filter will produce the output of the iteration that can be transformed further (here with a <code>map</code> and a <code>projection</code>) outside the iteration.</p>
<p>Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. As a consequence iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances, a method is provided to allow iterative programs to shut down automatically if no input is received by the iteration head for a predefined number of milliseconds.
To use this functionality the user needs to add the maxWaitTimeMillis parameter to the <code>dataStream.iterate(…)</code> call to control the max wait time.</p>
<p>By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the <code>closeWith</code> method.</p>
<h4 id="iteration-head-as-a-co-operator">Iteration head as a co-operator</h4>
<p>The user can also treat the input and feedback stream of a streaming iteration as a <code>ConnectedDataStream</code>. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback.</p>
<p>To use this feature the user needs to call the <code>withFeedbackType(type)</code> method of the iterative data stream and pass the type of the feedback stream:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ConnectedIterativeDataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">coiteration</span> <span class="o">=</span> <span class="n">source</span><span class="o">.</span><span class="na">iterate</span><span class="o">(</span><span class="n">maxWaitTimeMillis</span><span class="o">).</span><span class="na">withFeedbackType</span><span class="o">(</span><span class="err"></span><span class="n">String</span><span class="err"></span><span class="o">);</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">head</span> <span class="o">=</span> <span class="n">coiteration</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">CoFlatMapFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;(){})</span>
<span class="n">iteration</span><span class="o">.</span><span class="na">closeWith</span><span class="o">(</span><span class="n">head</span><span class="o">);</span></code></pre></div>
<p>In this case the original input of the head operator will be used as the first input to the co-operator and the feedback stream will be used as the second input.</p>
</div>
<div data-lang="scala">
<p>The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the batch Flink API. Iterative streaming programs also implement a step function and embed it into an <code>IterativeDataStream</code>.
Unlike in the batch API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail by defining a step function that return two DataStreams: a feedback and an output. The first one is the output that will be fed back to the start of the iteration and the second is the output stream of the iterative part.</p>
<p>A common pattern is to use <a href="#filter">filters</a> to separate the output from the feedback-stream. In this case all values passing the <code>isFeedback</code> filter will be fed back to the iteration head, and the values passing the <code>isOutput</code> filter will produce the output of the iteration that can be transformed further (here with a <code>map</code> and a <code>projection</code>) outside the iteration.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">iteratedStream</span> <span class="k">=</span> <span class="n">someDataStream</span><span class="o">.</span><span class="n">iterate</span><span class="o">(</span>
<span class="n">iteration</span> <span class="k">=&gt;</span> <span class="o">{</span>
<span class="k">val</span> <span class="n">head</span> <span class="k">=</span> <span class="n">iteration</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">iterationHead</span><span class="o">)</span>
<span class="k">val</span> <span class="n">tail</span> <span class="k">=</span> <span class="n">head</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">iterationTail</span><span class="o">)</span>
<span class="o">(</span><span class="n">tail</span><span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">isFeedback</span><span class="o">),</span> <span class="n">tail</span><span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">isOutput</span><span class="o">))</span>
<span class="o">},</span> <span class="n">maxWaitTimeMillis</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="err"></span><span class="o">).</span><span class="n">project</span><span class="o">(</span><span class="err"></span><span class="o">)</span></code></pre></div>
<p>Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. As a consequence iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
To use this functionality the user needs to add the maxWaitTimeMillis parameter to the <code>dataStream.iterate(…)</code> call to control the max wait time.</p>
<p>By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the <code>iterate</code> method.</p>
<h4 id="iteration-head-as-a-co-operator-1">Iteration head as a co-operator</h4>
<p>The user can also treat the input and feedback stream of a streaming iteration as a <code>ConnectedDataStream</code>. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback.</p>
<p>To use this feature the user needs to call implement a step function that operates on a <code>ConnectedDataStream</code> and pass it to the <code>iterate(…)</code> call.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">iteratedStream</span> <span class="k">=</span> <span class="n">someDataStream</span><span class="o">.</span><span class="n">iterate</span><span class="o">(</span>
<span class="n">stepFunction</span><span class="k">:</span> <span class="kt">ConnectedDataStream</span><span class="o">[</span><span class="kt">T</span>, <span class="kt">F</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="nc">DataStream</span><span class="o">[</span><span class="kt">F</span><span class="o">],</span> <span class="nc">DataStream</span><span class="o">[</span><span class="kt">R</span><span class="o">]),</span>
<span class="n">maxWaitTimeMillis</span><span class="o">)</span></code></pre></div>
<p>In this case the original input of the head operator will be used as the first input to the co-operator and the feedback stream will be used as the second input.</p>
</div>
</div>
<h3 id="rich-functions">Rich functions</h3>
<p>The <a href="programming_guide.html#rich-functions">usage</a> of rich functions are essentially the same as in the batch Flink API. All transformations that take as argument a user-defined function can instead take a rich function as argument:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">dataStream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">RichMapFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">parameters</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="cm">/* initialization of function */</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">map</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">toString</span><span class="o">();</span> <span class="o">}</span>
<span class="o">});</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dataStream</span> <span class="n">map</span>
<span class="k">new</span> <span class="nc">RichMapFunction</span><span class="o">[</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">]</span> <span class="o">{</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">open</span><span class="o">(</span><span class="n">config</span><span class="k">:</span> <span class="kt">Configuration</span><span class="o">)</span> <span class="k">=</span> <span class="o">{</span>
<span class="cm">/* initialization of function */</span>
<span class="o">}</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">map</span><span class="o">(</span><span class="n">value</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="n">toString</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<p>Rich functions provide, in addition to the user-defined function (<code>map()</code>, <code>reduce()</code>, etc), the <code>open()</code> and <code>close()</code> methods for initialization and finalization.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="stateful-computation">Stateful computation</h2>
<p>Flink supports the checkpointing and persistence of user defined operator states, so in case of a failure this state can be restored to the latest checkpoint and the processing will continue from there. This gives exactly once processing semantics with respect to the operator states when the sources follow this stateful pattern as well. In practice this usually means that sources keep track of their current offset as their OperatorState. The <code>PersistentKafkaSource</code> provides this stateful functionality for reading streams from Kafka.</p>
<h3 id="operatorstate">OperatorState</h3>
<p>Flink supports two types of operator states: partitioned and non-partitioned states.</p>
<p>In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When <code>OperatorState.getState()</code> is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, <code>getState()</code> will return number of inputs processed by each parallel mapper.</p>
<p>In case of of partitioned operator state a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.</p>
<p>Checkpointing of the states needs to be enabled from the <code>StreamExecutionEnvironment</code> using the <code>enableCheckpointing(…)</code> where additional parameters can be passed to modify the default 5 second checkpoint interval.</p>
<p>Operator states can be accessed from the <code>RuntimeContext</code> using the <code>getOperatorState(“name”, defaultValue, partitioned)</code> method so it is only accessible in <code>RichFunction</code>s. A recommended usage pattern is to retrieve the operator state in the <code>open(…)</code> method of the operator and set it as a field in the operator instance for runtime usage. Multiple <code>OperatorState</code>s can be used simultaneously by the same operator by using different names to identify them.</p>
<p>Partitioned operator state works only on <code>KeyedDataStreams</code>. A <code>KeyedDataStream</code> can be created from <code>DataStream</code> using the <code>keyBy</code> or <code>groupBy</code> methods. The <code>keyBy</code> method simply takes a <code>KeySelector</code> to derive the keys by which the operator state will be partitioned, however, it does not affect the actual partitioning of the <code>DataStream</code> records. If data partitioning is also desired then the <code>groupBy</code> method should be used instead to create a <code>GroupedDataStream</code> which is a subtype of <code>KeyedDataStream</code>. Mind that <code>KeyedDataStreams</code> do not support repartitioning (e.g. <code>shuffle(), forward(), groupBy(...)</code>).</p>
<p>By default operator states are checkpointed using default java serialization thus they need to be <code>Serializable</code>. The user can gain more control over the state checkpoint mechanism by passing a <code>StateCheckpointer</code> instance when retrieving the <code>OperatorState</code> from the <code>RuntimeContext</code>. The <code>StateCheckpointer</code> allows custom implementations for the checkpointing logic for increased efficiency and to store arbitrary non-serializable states.</p>
<p>By default state checkpoints will be stored in-memory at the JobManager. Flink also supports storing the checkpoints on any flink-supported file system (such as HDFS or Tachyon) which can be set in the flink-conf.yaml. Note that the state backend must be accessible from the JobManager, use <code>file://</code> only for local setups.</p>
<p>For example let us write a reduce function that besides summing the data it also counts have many elements it has seen.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">CounterSum</span> <span class="kd">implements</span> <span class="n">RichReduceFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="c1">//persistent counter</span>
<span class="kd">private</span> <span class="n">OperatorState</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">counter</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Long</span> <span class="n">value1</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value2</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">counter</span><span class="o">.</span><span class="na">updateState</span><span class="o">(</span><span class="n">counter</span><span class="o">.</span><span class="na">getState</span><span class="o">()</span> <span class="o">+</span> <span class="mi">1</span><span class="o">);</span>
<span class="k">return</span> <span class="n">value1</span> <span class="o">+</span> <span class="n">value2</span><span class="o">;</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span>
<span class="n">counter</span> <span class="o">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="na">getOperatorState</span><span class="o">(</span><span class="err"></span><span class="n">counter</span><span class="err"></span><span class="o">,</span> <span class="mi">0L</span><span class="o">,</span> <span class="kc">false</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>Stateful sources require a bit more care as opposed to other operators they are not data driven, but their <code>run(SourceContext)</code> methods potentially run infinitely. In order to make the updates to the state and output collection atomic the user is required to get a lock from the source’s context.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CounterSource</span> <span class="kd">implements</span> <span class="n">RichParallelSourceFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="c1">// utility for job cancellation</span>
<span class="kd">private</span> <span class="kd">volatile</span> <span class="kt">boolean</span> <span class="n">isRunning</span> <span class="o">=</span> <span class="kc">false</span><span class="o">;</span>
<span class="c1">// maintain the current offset for exactly once semantics</span>
<span class="kd">private</span> <span class="n">OperatorState</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">offset</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">run</span><span class="o">(</span><span class="n">SourceContext</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">ctx</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">isRunning</span> <span class="o">=</span> <span class="kc">true</span><span class="o">;</span>
<span class="n">Object</span> <span class="n">lock</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">getCheckpointLock</span><span class="o">();</span>
<span class="k">while</span> <span class="o">(</span><span class="n">isRunning</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// output and state update are atomic</span>
<span class="kd">synchronized</span> <span class="o">(</span><span class="n">lock</span><span class="o">){</span>
<span class="n">ctx</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">offset</span><span class="o">);</span>
<span class="n">offset</span><span class="o">.</span><span class="na">updateState</span><span class="o">(</span><span class="n">offset</span><span class="o">.</span><span class="na">getState</span><span class="o">()</span> <span class="o">+</span> <span class="mi">1</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span>
<span class="n">offset</span> <span class="o">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="na">getOperatorState</span><span class="o">(</span><span class="err"></span><span class="n">offset</span><span class="err"></span><span class="o">,</span> <span class="mi">0L</span><span class="o">);</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">cancel</span><span class="o">()</span> <span class="o">{</span>
<span class="n">isRunning</span> <span class="o">=</span> <span class="kc">false</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the <code>flink.streaming.api.checkpoint.CheckpointComitter</code> interface.</p>
<h3 id="checkpointed-interface">Checkpointed interface</h3>
<p>Another way of exposing user defined operator state for the Flink runtime for checkpointing is by using the <code>Checkpointed</code> interface.</p>
<p>When the user defined function implements the <code>Checkpointed</code> interface, the <code>snapshotState(…)</code> and <code>restoreState(…)</code> methods will be executed to draw and restore function state.</p>
<p>For example the same counting, reduce function shown for <code>OperatorState</code>s by using the <code>Checkpointed</code> interface instead:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">CounterSum</span> <span class="kd">implements</span> <span class="n">ReduceFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Checkpointed</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="c1">//persistent counter</span>
<span class="kd">private</span> <span class="kt">long</span> <span class="n">counter</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Long</span> <span class="n">value1</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value2</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">counter</span><span class="o">++;</span>
<span class="k">return</span> <span class="n">value1</span> <span class="o">+</span> <span class="n">value2</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// regularly persists state during normal operation</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Serializable</span> <span class="nf">snapshotState</span><span class="o">(</span><span class="kt">long</span> <span class="n">checkpointId</span><span class="o">,</span> <span class="kt">long</span> <span class="n">checkpointTimestamp</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="nf">Long</span><span class="o">(</span><span class="n">counter</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// restores state on recovery from failure</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">restoreState</span><span class="o">(</span><span class="n">Serializable</span> <span class="n">state</span><span class="o">)</span> <span class="o">{</span>
<span class="n">counter</span> <span class="o">=</span> <span class="o">(</span><span class="n">Long</span><span class="o">)</span> <span class="n">state</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<h3 id="state-checkpoints-in-iterative-jobs">State checkpoints in iterative jobs</h3>
<p>Fink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: <code>env.enableCheckpointing(interval, force = true)</code>.</p>
<p>Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="lambda-expressions-with-java-8">Lambda expressions with Java 8</h2>
<p>For a more concise code one can rely on one of the main features of Java 8: lambda expressions. The following program has similar functionality to the one provided in the <a href="#example-program">example</a> section, while showcasing the usage of lambda expressions.</p>
<div class="codetabs">
<div data-lang="java8">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">StreamingWordCount</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="kd">final</span> <span class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span>
<span class="s">&quot;Who&#39;s there?&quot;</span><span class="o">,</span>
<span class="s">&quot;I think I hear them. Stand, ho! Who&#39;s there?&quot;</span><span class="o">);</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">counts</span> <span class="o">=</span>
<span class="c1">// normalize and split each line</span>
<span class="n">text</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">line</span> <span class="o">-&gt;</span> <span class="n">line</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">))</span>
<span class="c1">// convert splitted line in pairs (2-tuples) containing: (word,1)</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">((</span><span class="n">String</span><span class="o">[]</span> <span class="n">tokens</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="c1">// emit the pairs with non-zero-length words</span>
<span class="n">Arrays</span><span class="o">.</span><span class="na">stream</span><span class="o">(</span><span class="n">tokens</span><span class="o">)</span>
<span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">t</span> <span class="o">-&gt;</span> <span class="n">t</span><span class="o">.</span><span class="na">length</span><span class="o">()</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">forEach</span><span class="o">(</span><span class="n">t</span> <span class="o">-&gt;</span> <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;&gt;(</span><span class="n">t</span><span class="o">,</span> <span class="mi">1</span><span class="o">)));</span>
<span class="o">})</span>
<span class="c1">// group by the tuple field &quot;0&quot; and sum up tuple field &quot;1&quot;</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">counts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Streaming WordCount&quot;</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<p>For a detailed Java 8 Guide please refer to the <a href="java8_programming_guide.html">Java 8 Programming Guide</a>. Operators specific to streaming, such as Operator splitting also support this usage. <a href="#output-splitting">Output splitting</a> can be rewritten as follows:</p>
<div class="codetabs">
<div data-lang="java8">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">SplitDataStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">split</span> <span class="o">=</span> <span class="n">someDataStream</span>
<span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="n">x</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">x</span> <span class="o">%</span> <span class="mi">2</span><span class="o">)));</span></code></pre></div>
</div>
</div>
<h2 id="operator-settings">Operator Settings</h2>
<h3 id="parallelism">Parallelism</h3>
<p>Setting parallelism for operators works exactly the same way as in the batch Flink API. The user can control the number of parallel instances created for each operator by calling the <code>operator.setParallelism(parallelism)</code> method.</p>
<h3 id="buffer-timeout">Buffer timeout</h3>
<p>By default, data points are not transferred on the network one-by-one, which would cause unnecessary network traffic, but are buffered in the output buffers. The size of the output buffers can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough.
To tackle this issue the user can call <code>env.setBufferTimeout(timeoutMillis)</code> on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the buffers are flushed automatically even if they are not full. The default value for this timeout is 100 ms, which should be appropriate for most use-cases.</p>
<p>Usage:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">LocalStreamEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">createLocalEnvironment</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">setBufferTimeout</span><span class="o">(</span><span class="n">timeoutMillis</span><span class="o">);</span>
<span class="n">env</span><span class="o">.</span><span class="na">genereateSequence</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span><span class="mi">10</span><span class="o">).</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyMapper</span><span class="o">()).</span><span class="na">setBufferTimeout</span><span class="o">(</span><span class="n">timeoutMillis</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">LocalStreamEnvironment</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">createLocalEnvironment</span>
<span class="n">env</span><span class="o">.</span><span class="n">setBufferTimeout</span><span class="o">(</span><span class="n">timeoutMillis</span><span class="o">)</span>
<span class="n">env</span><span class="o">.</span><span class="n">genereateSequence</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span><span class="mi">10</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">myMap</span><span class="o">).</span><span class="n">setBufferTimeout</span><span class="o">(</span><span class="n">timeoutMillis</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p>To maximise the throughput the user can call <code>setBufferTimeout(-1)</code> which will remove the timeout and buffers will only be flushed when they are full.
To minimise latency, set the timeout to a value close to 0 (for example 5 or 10 ms). Theoretically, a buffer timeout of 0 will cause all output to be flushed when produced, but this setting should be avoided, because it can cause severe performance degradation.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="stream-connectors">Stream connectors</h2>
<!-- TODO: reintroduce flume -->
<p>Connectors provide an interface for accessing data from various third party sources (message queues). Currently three connectors are natively supported, namely <a href="https://kafka.apache.org/">Apache Kafka</a>, <a href="http://www.rabbitmq.com/">RabbitMQ</a> and the <a href="https://dev.twitter.com/docs/streaming-apis">Twitter Streaming API</a>.</p>
<p>Typically the connector packages consist of a source and sink class (with the exception of Twitter where only a source is provided). To use these sources the user needs to pass Serialization/Deserialization schemas for the connectors for the desired types. (Or use some predefined ones)</p>
<p>To run an application using one of these connectors, additional third party components are usually required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. <a href="#docker-containers-for-connectors">Docker containers</a> are also provided encapsulating these services to aid users getting started with connectors.</p>
<h3 id="apache-kafka">Apache Kafka</h3>
<p>This connector provides access to data streams from <a href="https://kafka.apache.org/">Apache Kafka</a>. To use this connector, add the following dependency to your project:</p>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-connector-kafka<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
<p>Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution <a href="cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution">here</a>.</p>
<h4 id="installing-apache-kafka">Installing Apache Kafka</h4>
<ul>
<li>Follow the instructions from <a href="https://kafka.apache.org/documentation.html#quickstart">Kafka’s quickstart</a> to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).</li>
<li>On 32 bit computers <a href="http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in">this</a> problem may occur.</li>
<li>If the Kafka and Zookeeper servers are running on a remote machine, then the <code>advertised.host.name</code> setting in the <code>config/server.properties</code> file the must be set to the machine’s IP address.</li>
</ul>
<h4 id="kafka-source">Kafka Source</h4>
<p>The standard <code>KafkaSource</code> is a Kafka consumer providing an access to one topic.</p>
<p>The following parameters have to be provided for the <code>KafkaSource(...)</code> constructor:</p>
<ol>
<li>Zookeeper hostname</li>
<li>The topic name</li>
<li>Deserialization schema</li>
</ol>
<p>Example:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">env</span>
<span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="n">KafkaSource</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="s">&quot;localhost:2181&quot;</span><span class="o">,</span> <span class="s">&quot;test&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">SimpleStringSchema</span><span class="o">()))</span>
<span class="o">.</span><span class="na">print</span><span class="o">();</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">stream</span> <span class="k">=</span> <span class="n">env</span>
<span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nc">KafkaSource</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;localhost:2181&quot;</span><span class="o">,</span> <span class="s">&quot;test&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nc">SimpleStringSchema</span><span class="o">)</span>
<span class="o">.</span><span class="n">print</span></code></pre></div>
</div>
</div>
<h4 id="persistent-kafka-source">Persistent Kafka Source</h4>
<p>As Kafka persists all the data, a fault tolerant Kafka source can be provided.</p>
<p>The PersistentKafkaSource can read a topic, and if the job fails for some reason, the source will
continue on reading from where it left off after a restart.
For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job
failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.</p>
<p>To use fault tolerant Kafka Sources, monitoring of the topology needs to be enabled at the execution environment:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">enableMonitoring</span><span class="o">(</span><span class="mi">5000</span><span class="o">);</span></code></pre></div>
</div>
</div>
<p>Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.
So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
Flink on YARN supports automatic restart of lost YARN containers.</p>
<p>The following arguments have to be provided for the <code>PersistentKafkaSource(...)</code> constructor:</p>
<ol>
<li>Zookeeper hostname</li>
<li>The topic name</li>
<li>Deserialization schema</li>
</ol>
<p>Example:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">stream</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="n">PersistentKafkaSource</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="s">&quot;localhost:2181&quot;</span><span class="o">,</span> <span class="s">&quot;test&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">SimpleStringSchema</span><span class="o">()));</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">stream</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nc">PersistentKafkaSource</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;localhost:2181&quot;</span><span class="o">,</span> <span class="s">&quot;test&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nc">SimpleStringSchema</span><span class="o">))</span></code></pre></div>
</div>
</div>
<h4 id="kafka-sink">Kafka Sink</h4>
<p>A class providing an interface for sending data to Kafka.</p>
<p>The followings have to be provided for the <code>KafkaSink(…)</code> constructor in order:</p>
<ol>
<li>Zookeeper hostname</li>
<li>The topic name</li>
<li>Serialization schema</li>
</ol>
<p>Example:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">stream</span><span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="k">new</span> <span class="n">KafkaSink</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="s">&quot;localhost:2181&quot;</span><span class="o">,</span> <span class="s">&quot;test&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">SimpleStringSchema</span><span class="o">()));</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">stream</span><span class="o">.</span><span class="n">addSink</span><span class="o">(</span><span class="k">new</span> <span class="nc">KafkaSink</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;localhost:2181&quot;</span><span class="o">,</span> <span class="s">&quot;test&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nc">SimpleStringSchema</span><span class="o">))</span></code></pre></div>
</div>
</div>
<p>The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="nf">KafkaSink</span><span class="o">(</span><span class="n">String</span> <span class="n">zookeeperAddress</span><span class="o">,</span> <span class="n">String</span> <span class="n">topicId</span><span class="o">,</span> <span class="n">Properties</span> <span class="n">producerConfig</span><span class="o">,</span>
<span class="n">SerializationSchema</span><span class="o">&lt;</span><span class="n">IN</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span> <span class="n">serializationSchema</span><span class="o">)</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">public</span> <span class="nc">KafkaSink</span><span class="o">(</span><span class="nc">String</span> <span class="n">zookeeperAddress</span><span class="o">,</span> <span class="nc">String</span> <span class="n">topicId</span><span class="o">,</span> <span class="nc">Properties</span> <span class="n">producerConfig</span><span class="o">,</span>
<span class="nc">SerializationSchema</span> <span class="n">serializationSchema</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p>If this constructor is used, the user needs to make sure to set the broker with the “metadata.broker.list” property. Also the serializer configuration should be left default, the serialization should be set via SerializationSchema.</p>
<p>More about Kafka can be found <a href="https://kafka.apache.org/documentation.html">here</a>.</p>
<p><a href="#top">Back to top</a></p>
<h3 id="rabbitmq">RabbitMQ</h3>
<p>This connector provides access to data streams from <a href="http://www.rabbitmq.com/">RabbitMQ</a>. To use this connector, add the following dependency to your project:</p>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-connector-rabbitmq<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
<p>Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution <a href="cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution">here</a>.</p>
<h4 id="installing-rabbitmq">Installing RabbitMQ</h4>
<p>Follow the instructions from the <a href="http://www.rabbitmq.com/download.html">RabbitMQ download page</a>. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.</p>
<h4 id="rabbitmq-source">RabbitMQ Source</h4>
<p>A class providing an interface for receiving data from RabbitMQ.</p>
<p>The followings have to be provided for the <code>RMQSource(…)</code> constructor in order:</p>
<ol>
<li>The hostname</li>
<li>The queue name</li>
<li>Deserialization schema</li>
</ol>
<p>Example:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">env</span>
<span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="n">RMQSource</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="s">&quot;hello&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">SimpleStringSchema</span><span class="o">))</span>
<span class="o">.</span><span class="na">print</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">stream</span> <span class="k">=</span> <span class="n">env</span>
<span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nc">RMQSource</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="s">&quot;hello&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nc">SimpleStringSchema</span><span class="o">))</span>
<span class="o">.</span><span class="n">print</span></code></pre></div>
</div>
</div>
<h4 id="rabbitmq-sink">RabbitMQ Sink</h4>
<p>A class providing an interface for sending data to RabbitMQ.</p>
<p>The followings have to be provided for the <code>RMQSink(…)</code> constructor in order:</p>
<ol>
<li>The hostname</li>
<li>The queue name</li>
<li>Serialization schema</li>
</ol>
<p>Example:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">stream</span><span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="k">new</span> <span class="n">RMQSink</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="s">&quot;hello&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">StringToByteSerializer</span><span class="o">()));</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">stream</span><span class="o">.</span><span class="n">addSink</span><span class="o">(</span><span class="k">new</span> <span class="nc">RMQSink</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="s">&quot;hello&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nc">StringToByteSerializer</span><span class="o">))</span></code></pre></div>
</div>
</div>
<p>More about RabbitMQ can be found <a href="http://www.rabbitmq.com/">here</a>.</p>
<p><a href="#top">Back to top</a></p>
<h3 id="twitter-streaming-api">Twitter Streaming API</h3>
<p>Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in <code>TwitterSource</code> class for establishing a connection to this stream. To use this connector, add the following dependency to your project:</p>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-connector-twitter<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
<p>Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution <a href="cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution">here</a>.</p>
<h4 id="authentication">Authentication</h4>
<p>In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.</p>
<h4 id="acquiring-the-authentication-information">Acquiring the authentication information</h4>
<p>First of all, a Twitter account is needed. Sign up for free at <a href="https://twitter.com/signup">twitter.com/signup</a> or sign in at Twitter’s <a href="https://apps.twitter.com/">Application Management</a> and register the application by clicking on the “Create New App” button. Fill out a form about your program and accept the Terms and Conditions.
After selecting the application, the API key and API secret (called <code>consumerKey</code> and <code>sonsumerSecret</code> in <code>TwitterSource</code> respectively) is located on the “API Keys” tab. The necessary access token data (<code>token</code> and <code>secret</code>) can be acquired here.
Remember to keep these pieces of information a secret and do not push them to public repositories.</p>
<h4 id="accessing-the-authentication-information">Accessing the authentication information</h4>
<p>Create a properties file, and pass its path in the constructor of <code>TwitterSource</code>. The content of the file should be similar to this:</p>
<div class="highlight"><pre><code class="language-bash"><span class="c">#properties file for my app</span>
<span class="nv">secret</span><span class="o">=</span>***
<span class="nv">consumerSecret</span><span class="o">=</span>***
<span class="nv">token</span><span class="o">=</span>***-***
<span class="nv">consumerKey</span><span class="o">=</span>***</code></pre></div>
<h4 id="constructors">Constructors</h4>
<p>The <code>TwitterSource</code> class has two constructors.</p>
<ol>
<li><code>public TwitterSource(String authPath, int numberOfTweets);</code>
to emit finite number of tweets</li>
<li><code>public TwitterSource(String authPath);</code>
for streaming</li>
</ol>
<p>Both constructors expect a <code>String authPath</code> argument determining the location of the properties file containing the authentication information. In the first case, <code>numberOfTweets</code> determine how many tweet the source emits.</p>
<h4 id="usage">Usage</h4>
<p>In contrast to other connectors, the <code>TwitterSource</code> depends on no additional services. For example the following code should run gracefully:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">streamSource</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">TwitterSource</span><span class="o">(</span><span class="s">&quot;/PATH/TO/myFile.properties&quot;</span><span class="o">));</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">streamSource</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nc">TwitterSource</span><span class="o">(</span><span class="s">&quot;/PATH/TO/myFile.properties&quot;</span><span class="o">))</span></code></pre></div>
</div>
</div>
<p>The <code>TwitterSource</code> emits strings containing a JSON code.
To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation <code>JSONParseFlatMap</code> abstract class among the examples. <code>JSONParseFlatMap</code> is an extension of the <code>FlatMapFunction</code> and has a</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">String</span> <span class="nf">getField</span><span class="o">(</span><span class="n">String</span> <span class="n">jsonText</span><span class="o">,</span> <span class="n">String</span> <span class="n">field</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">getField</span><span class="o">(</span><span class="n">jsonText</span> <span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">field</span> <span class="k">:</span> <span class="kt">String</span><span class="o">)</span> <span class="k">:</span> <span class="kt">String</span></code></pre></div>
</div>
</div>
<p>function which can be use to acquire the value of a given field.</p>
<p>There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.</p>
<h4 id="example">Example</h4>
<p><code>TwitterLocal</code> is an example how to use <code>TwitterSource</code>. It implements a language frequency counter program.</p>
<p><a href="#top">Back to top</a></p>
<h3 id="docker-containers-for-connectors">Docker containers for connectors</h3>
<p>A Docker container is provided with all the required configurations for test running the connectors of Apache Flink. The servers for the message queues will be running on the docker container while the example topology can be run on the user’s computer.</p>
<h4 id="installing-docker">Installing Docker</h4>
<p>The official Docker installation guide can be found <a href="https://docs.docker.com/installation/">here</a>.
After installing Docker an image can be pulled for each connector. Containers can be started from these images where all the required configurations are set.</p>
<h4 id="creating-a-jar-with-all-the-dependencies">Creating a jar with all the dependencies</h4>
<p>For the easiest setup, create a jar with all the dependencies of the <em>flink-streaming-connectors</em> project.</p>
<div class="highlight"><pre><code class="language-bash"><span class="nb">cd</span> /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors
mvn assembly:assembly
~~~bash
This creates an assembly jar under *flink-streaming-connectors/target*.
<span class="c">#### RabbitMQ</span>
Pull the docker image:
~~~bash
sudo docker pull flinkstreaming/flink-connectors-rabbitmq</code></pre></div>
<p>To run the container, type:</p>
<div class="highlight"><pre><code class="language-bash">sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq</code></pre></div>
<p>Now a terminal has started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost’s and the Docker container’s ports so RabbitMQ can communicate with the application through these.</p>
<p>To start the RabbitMQ server:</p>
<div class="highlight"><pre><code class="language-bash">sudo /etc/init.d/rabbitmq-server start</code></pre></div>
<p>To launch the example on the host computer, execute:</p>
<div class="highlight"><pre><code class="language-bash">java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology <span class="se">\</span>
&gt; log.txt 2&gt; errorlog.txt</code></pre></div>
<p>There are two connectors in the example. One that sends messages to RabbitMQ, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:</p>
<div class="highlight"><pre><code>&lt;DATE&gt; INFO rabbitmq.RMQTopology: String: &lt;one&gt; arrived from RMQ
&lt;DATE&gt; INFO rabbitmq.RMQTopology: String: &lt;two&gt; arrived from RMQ
&lt;DATE&gt; INFO rabbitmq.RMQTopology: String: &lt;three&gt; arrived from RMQ
&lt;DATE&gt; INFO rabbitmq.RMQTopology: String: &lt;four&gt; arrived from RMQ
&lt;DATE&gt; INFO rabbitmq.RMQTopology: String: &lt;five&gt; arrived from RMQ
</code></pre></div>
<h4 id="apache-kafka-1">Apache Kafka</h4>
<p>Pull the image:</p>
<div class="highlight"><pre><code class="language-bash">sudo docker pull flinkstreaming/flink-connectors-kafka</code></pre></div>
<p>To run the container type:</p>
<div class="highlight"><pre><code class="language-bash">sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i <span class="se">\</span>
flinkstreaming/flink-connectors-kafka</code></pre></div>
<p>Now a terminal has started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost’s and the Docker container’s ports so Kafka can communicate with the application through these.
First start a zookeeper in the background:</p>
<div class="highlight"><pre><code class="language-bash">/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties <span class="se">\</span>
&gt; zookeeperlog.txt <span class="p">&amp;</span></code></pre></div>
<p>Then start the kafka server in the background:</p>
<div class="highlight"><pre><code class="language-bash">/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties <span class="se">\</span>
&gt; serverlog.txt 2&gt; servererr.txt <span class="p">&amp;</span></code></pre></div>
<p>To launch the example on the host computer execute:</p>
<div class="highlight"><pre><code class="language-bash">java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology <span class="se">\</span>
&gt; log.txt 2&gt; errorlog.txt</code></pre></div>
<p>In the example there are two connectors. One that sends messages to Kafka, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:</p>
<div class="highlight"><pre><code>&lt;DATE&gt; INFO kafka.KafkaTopology: String: (0) arrived from Kafka
&lt;DATE&gt; INFO kafka.KafkaTopology: String: (1) arrived from Kafka
&lt;DATE&gt; INFO kafka.KafkaTopology: String: (2) arrived from Kafka
&lt;DATE&gt; INFO kafka.KafkaTopology: String: (3) arrived from Kafka
&lt;DATE&gt; INFO kafka.KafkaTopology: String: (4) arrived from Kafka
&lt;DATE&gt; INFO kafka.KafkaTopology: String: (5) arrived from Kafka
&lt;DATE&gt; INFO kafka.KafkaTopology: String: (6) arrived from Kafka
&lt;DATE&gt; INFO kafka.KafkaTopology: String: (7) arrived from Kafka
&lt;DATE&gt; INFO kafka.KafkaTopology: String: (8) arrived from Kafka
&lt;DATE&gt; INFO kafka.KafkaTopology: String: (9) arrived from Kafka
</code></pre></div>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/master/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>