blob: 3701198933aea888a04625251d24f10c51f9430d [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.10-SNAPSHOT Documentation: Flink Programming Guide</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/master/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/master/index.html">Overview<span class="hidden-sm hidden-xs"> 0.10</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/setup/building.html">Get Flink 0.10-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/master/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/master/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Flink Programming Guide</h1>
<p><a href="#top"></a></p>
<p>Analysis programs in Flink are regular programs that implement transformations on data sets
(e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain
sources (e.g., by reading files, or from local collections). Results are returned via sinks, which may for
example write the data to (distributed) files, or to standard output (for example the command line
terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
The execution can happen in a local JVM, or on clusters of many machines.</p>
<p>In order to create your own Flink program, we encourage you to start with the
<a href="#program-skeleton">program skeleton</a> and gradually add your own
<a href="#transformations">transformations</a>. The remaining sections act as references for additional
operations and advanced features.</p>
<ul id="markdown-toc">
<li><a href="#example-program" id="markdown-toc-example-program">Example Program</a></li>
<li><a href="#linking-with-flink" id="markdown-toc-linking-with-flink">Linking with Flink</a></li>
<li><a href="#program-skeleton" id="markdown-toc-program-skeleton">Program Skeleton</a></li>
<li><a href="#dataset-abstraction" id="markdown-toc-dataset-abstraction">DataSet abstraction</a></li>
<li><a href="#lazy-evaluation" id="markdown-toc-lazy-evaluation">Lazy Evaluation</a></li>
<li><a href="#transformations" id="markdown-toc-transformations">Transformations</a></li>
<li><a href="#specifying-keys" id="markdown-toc-specifying-keys">Specifying Keys</a></li>
<li><a href="#passing-functions-to-flink" id="markdown-toc-passing-functions-to-flink">Passing Functions to Flink</a></li>
<li><a href="#data-types" id="markdown-toc-data-types">Data Types</a></li>
<li><a href="#data-sources" id="markdown-toc-data-sources">Data Sources</a> <ul>
<li><a href="#read-compressed-files" id="markdown-toc-read-compressed-files">Read Compressed Files</a></li>
</ul>
</li>
<li><a href="#execution-configuration" id="markdown-toc-execution-configuration">Execution Configuration</a></li>
<li><a href="#data-sinks" id="markdown-toc-data-sinks">Data Sinks</a></li>
<li><a href="#debugging" id="markdown-toc-debugging">Debugging</a> <ul>
<li><a href="#local-execution-environment" id="markdown-toc-local-execution-environment">Local Execution Environment</a></li>
<li><a href="#collection-data-sources-and-sinks" id="markdown-toc-collection-data-sources-and-sinks">Collection Data Sources and Sinks</a></li>
</ul>
</li>
<li><a href="#iteration-operators" id="markdown-toc-iteration-operators">Iteration Operators</a></li>
<li><a href="#semantic-annotations" id="markdown-toc-semantic-annotations">Semantic Annotations</a></li>
<li><a href="#broadcast-variables" id="markdown-toc-broadcast-variables">Broadcast Variables</a></li>
<li><a href="#passing-parameters-to-functions" id="markdown-toc-passing-parameters-to-functions">Passing Parameters to Functions</a></li>
<li><a href="#program-packaging--distributed-execution" id="markdown-toc-program-packaging--distributed-execution">Program Packaging &amp; Distributed Execution</a></li>
<li><a href="#accumulators--counters" id="markdown-toc-accumulators--counters">Accumulators &amp; Counters</a></li>
<li><a href="#parallel-execution" id="markdown-toc-parallel-execution">Parallel Execution</a> <ul>
<li><a href="#operator-level" id="markdown-toc-operator-level">Operator Level</a></li>
<li><a href="#execution-environment-level" id="markdown-toc-execution-environment-level">Execution Environment Level</a></li>
<li><a href="#client-level" id="markdown-toc-client-level">Client Level</a></li>
<li><a href="#system-level" id="markdown-toc-system-level">System Level</a></li>
</ul>
</li>
<li><a href="#execution-plans" id="markdown-toc-execution-plans">Execution Plans</a></li>
</ul>
<h2 id="example-program">Example Program</h2>
<p>The following program is a complete, working example of WordCount. You can copy &amp; paste the code
to run it locally. You only have to include the correct Flinkā€™s library into your project
(see Section <a href="#linking-with-flink">Linking with Flink</a>) and specify the imports. Then you are ready
to go!</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCountExample</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span>
<span class="s">&quot;Who&#39;s there?&quot;</span><span class="o">,</span>
<span class="s">&quot;I think I hear them. Stand, ho! Who&#39;s there?&quot;</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">text</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">LineSplitter</span><span class="o">())</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">LineSplitter</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">line</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.flink.api.scala._</span>
<span class="k">object</span> <span class="nc">WordCount</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span>
<span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="k">val</span> <span class="n">text</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span>
<span class="s">&quot;Who&#39;s there?&quot;</span><span class="o">,</span>
<span class="s">&quot;I think I hear them. Stand, ho! Who&#39;s there?&quot;</span><span class="o">)</span>
<span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">text</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">toLowerCase</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)</span> <span class="n">filter</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">nonEmpty</span> <span class="o">}</span> <span class="o">}</span>
<span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span>
<span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="n">counts</span><span class="o">.</span><span class="n">print</span><span class="o">()</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<p><a href="#top">Back to top</a></p>
<h2 id="linking-with-flink">Linking with Flink</h2>
<p>To write programs with Flink, you need to include the Flink library corresponding to
your programming language in your project.</p>
<p>The simplest way to do this is to use one of the quickstart scripts: either for
<a href="http://flink.apache.org/docs/master/quickstart/java_api_quickstart.html">Java</a> or for <a href="http://flink.apache.org/docs/master/quickstart/scala_api_quickstart.html">Scala</a>. They
create a blank project from a template (a Maven Archetype), which sets up everything for you. To
manually create the project, you can use the archetype and create a project by calling:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-bash" data-lang="bash">mvn archetype:generate /
-DarchetypeGroupId<span class="o">=</span>org.apache.flink/
-DarchetypeArtifactId<span class="o">=</span>flink-quickstart-java /
-DarchetypeVersion<span class="o">=</span>0.10-SNAPSHOT</code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-bash" data-lang="bash">mvn archetype:generate /
-DarchetypeGroupId<span class="o">=</span>org.apache.flink/
-DarchetypeArtifactId<span class="o">=</span>flink-quickstart-scala /
-DarchetypeVersion<span class="o">=</span>0.10-SNAPSHOT</code></pre></div>
</div>
</div>
<p>The archetypes are working for stable releases and preview versions (<code>-SNAPSHOT</code>)</p>
<p>If you want to add Flink to an existing Maven project, add the following entry to your
<em>dependencies</em> section in the <em>pom.xml</em> file of your project:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-java<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-clients<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-scala<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-clients<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10-SNAPSHOT<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
<p><strong>Important:</strong> When working with the Scala API you must have one of these two imports:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.flink.api.scala._</span></code></pre></div>
<p>or</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.flink.api.scala.createTypeInformation</span></code></pre></div>
<p>The reason is that Flink analyzes the types that are used in a program and generates serializers
and comparaters for them. By having either of those imports you enable an implicit conversion
that creates the type information for Flink operations.</p>
</div>
</div>
<h4 id="hadoop-dependency-versions">Hadoop Dependency Versions</h4>
<p>If you are using Flink together with Hadoop, the version of the dependency may vary depending on the
version of Hadoop (or more specifically, HDFS) that you want to use Flink with. Please refer to the
<a href="http://flink.apache.org/downloads.html">downloads page</a> for a list of available versions, and instructions
on how to link with custom versions of Hadoop.</p>
<p>In order to link against the latest SNAPSHOT versions of the code, please follow
<a href="http://flink.apache.org/how-to-contribute.html#snapshots-nightly-builds">this guide</a>.</p>
<p>The <em>flink-clients</em> dependency is only necessary to invoke the Flink program locally (for example to
run it standalone for testing and debugging). If you intend to only export the program as a JAR
file and <a href="cluster_execution.html">run it on a cluster</a>, you can skip that dependency.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="program-skeleton">Program Skeleton</h2>
<div class="codetabs">
<div data-lang="java">
<p>As we already saw in the example, Flink programs look like regular Java
programs with a <code>main()</code> method. Each program consists of the same basic parts:</p>
<ol>
<li>Obtain an <code>ExecutionEnvironment</code>,</li>
<li>Load/create the initial data,</li>
<li>Specify transformations on this data,</li>
<li>Specify where to put the results of your computations, and</li>
<li>Trigger the program execution</li>
</ol>
<p>We will now give an overview of each of those steps, please refer to the respective sections for
more details. Note that all
<a href="https://github.com/apache/flink/blob/master//flink-java/src/main/java/org/apache/flink/api/java">core classes of the Java API</a>
are found in the package <code>org.apache.flink.api.java</code>.</p>
<p>The <code>ExecutionEnvironment</code> is the basis for all Flink programs. You can
obtain one using these static methods on class <code>ExecutionEnvironment</code>:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">getExecutionEnvironment</span><span class="o">()</span>
<span class="n">createLocalEnvironment</span><span class="o">()</span>
<span class="n">createLocalEnvironment</span><span class="o">(</span><span class="kt">int</span> <span class="n">parallelism</span><span class="o">)</span>
<span class="n">createLocalEnvironment</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">customConfiguration</span><span class="o">)</span>
<span class="n">createRemoteEnvironment</span><span class="o">(</span><span class="n">String</span> <span class="n">host</span><span class="o">,</span> <span class="kt">int</span> <span class="n">port</span><span class="o">,</span> <span class="n">String</span><span class="o">...</span> <span class="n">jarFiles</span><span class="o">)</span>
<span class="n">createRemoteEnvironment</span><span class="o">(</span><span class="n">String</span> <span class="n">host</span><span class="o">,</span> <span class="kt">int</span> <span class="n">port</span><span class="o">,</span> <span class="kt">int</span> <span class="n">parallelism</span><span class="o">,</span> <span class="n">String</span><span class="o">...</span> <span class="n">jarFiles</span><span class="o">)</span></code></pre></div>
<p>Typically, you only need to use <code>getExecutionEnvironment()</code>, since this
will do the right thing depending on the context: if you are executing
your program inside an IDE or as a regular Java program it will create
a local environment that will execute your program on your local machine. If
you created a JAR file from you program, and invoke it through the <a href="cli.html">command line</a>
or the <a href="web_client.html">web interface</a>,
the Flink cluster manager will execute your main method and <code>getExecutionEnvironment()</code> will return
an execution environment for executing your program on a cluster.</p>
<p>For specifying data sources the execution environment has several methods
to read from files using various methods: you can just read them line by line,
as CSV files, or using completely custom data input formats. To just read
a text file as a sequence of lines, you can use:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readTextFile</span><span class="o">(</span><span class="s">&quot;file:///path/to/file&quot;</span><span class="o">);</span></code></pre></div>
<p>This will give you a DataSet on which you can then apply transformations. For
more information on data sources and input formats, please refer to
<a href="#data-sources">Data Sources</a>.</p>
<p>Once you have a DataSet you can apply transformations to create a new
DataSet which you can then write to a file, transform again, or
combine with other DataSets. You apply transformations by calling
methods on DataSet with your own custom transformation function. For example,
a map transformation looks like this:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">tokenized</span> <span class="o">=</span> <span class="n">text</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
<p>This will create a new DataSet by converting every String in the original
set to an Integer. For more information and a list of all the transformations,
please refer to <a href="#transformations">Transformations</a>.</p>
<p>Once you have a DataSet containing your final results, you can either write the result
to a file system (HDFS or local) or print it.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">writeAsText</span><span class="o">(</span><span class="n">String</span> <span class="n">path</span><span class="o">)</span>
<span class="n">writeAsCsv</span><span class="o">(</span><span class="n">String</span> <span class="n">path</span><span class="o">)</span>
<span class="n">write</span><span class="o">(</span><span class="n">FileOutputFormat</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">outputFormat</span><span class="o">,</span> <span class="n">String</span> <span class="n">filePath</span><span class="o">)</span>
<span class="n">print</span><span class="o">()</span>
<span class="n">printOnTaskManager</span><span class="o">()</span>
<span class="n">collect</span><span class="o">()</span></code></pre></div>
</div>
<div data-lang="scala">
<p>As we already saw in the example, Flink programs look like regular Scala
programs with a <code>main()</code> method. Each program consists of the same basic parts:</p>
<ol>
<li>Obtain an <code>ExecutionEnvironment</code>,</li>
<li>Load/create the initial data,</li>
<li>Specify transformations on this data,</li>
<li>Specify where to put the results of your computations, and</li>
<li>Trigger the program execution</li>
</ol>
<p>We will now give an overview of each of those steps but please refer to the respective sections for
more details. Note that all core classes of the Scala API are found in the package
<a href="https://github.com/apache/flink/blob/master//flink-scala/src/main/scala/org/apache/flink/api/scala">org.apache.flink.api.scala</a>.</p>
<p>The <code>ExecutionEnvironment</code> is the basis for all Flink programs. You can
obtain one using these static methods on class <code>ExecutionEnvironment</code>:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">getExecutionEnvironment</span>
<span class="k">def</span> <span class="n">createLocalEnvironment</span><span class="o">(</span><span class="n">parallelism</span><span class="k">:</span> <span class="kt">Int</span> <span class="o">=</span> <span class="nc">Runtime</span><span class="o">.</span><span class="n">getRuntime</span><span class="o">.</span><span class="n">availableProcessors</span><span class="o">()))</span>
<span class="k">def</span> <span class="n">createLocalEnvironment</span><span class="o">(</span><span class="n">customConfiguration</span><span class="k">:</span> <span class="kt">Configuration</span><span class="o">)</span>
<span class="k">def</span> <span class="n">createRemoteEnvironment</span><span class="o">(</span><span class="n">host</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">port</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">jarFiles</span><span class="k">:</span> <span class="kt">String*</span><span class="o">)</span>
<span class="k">def</span> <span class="n">createRemoteEnvironment</span><span class="o">(</span><span class="n">host</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">port</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">parallelism</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">jarFiles</span><span class="k">:</span> <span class="kt">String*</span><span class="o">)</span></code></pre></div>
<p>Typically, you only need to use <code>getExecutionEnvironment()</code>, since this
will do the right thing depending on the context: if you are executing
your program inside an IDE or as a regular Scala program it will create
a local environment that will execute your program on your local machine. If
you created a JAR file from you program, and invoke it through the <a href="cli.html">command line</a>
or the <a href="web_client.html">web interface</a>,
the Flink cluster manager will execute your main method and <code>getExecutionEnvironment()</code> will return
an execution environment for executing your program on a cluster.</p>
<p>For specifying data sources the execution environment has several methods
to read from files using various methods: you can just read them line by line,
as CSV files, or using completely custom data input formats. To just read
a text file as a sequence of lines, you can use:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span><span class="o">()</span>
<span class="k">val</span> <span class="n">text</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readTextFile</span><span class="o">(</span><span class="s">&quot;file:///path/to/file&quot;</span><span class="o">)</span></code></pre></div>
<p>This will give you a DataSet on which you can then apply transformations. For
more information on data sources and input formats, please refer to
<a href="#data-sources">Data Sources</a>.</p>
<p>Once you have a DataSet you can apply transformations to create a new
DataSet which you can then write to a file, transform again, or
combine with other DataSets. You apply transformations by calling
methods on DataSet with your own custom transformation function. For example,
a map transformation looks like this:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">val</span> <span class="n">mapped</span> <span class="k">=</span> <span class="n">text</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="n">x</span><span class="o">.</span><span class="n">toInt</span> <span class="o">}</span></code></pre></div>
<p>This will create a new DataSet by converting every String in the original
set to an Integer. For more information and a list of all the transformations,
please refer to <a href="#transformations">Transformations</a>.</p>
<p>Once you have a DataSet containing your final results, you can either write the result
to a file system (HDFS or local) or print it.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">writeAsText</span><span class="o">(</span><span class="n">path</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">writeMode</span><span class="k">:</span> <span class="kt">WriteMode</span> <span class="o">=</span> <span class="nc">WriteMode</span><span class="o">.</span><span class="nc">NO_OVERWRITE</span><span class="o">)</span>
<span class="k">def</span> <span class="n">writeAsCsv</span><span class="o">(</span>
<span class="n">filePath</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span>
<span class="n">rowDelimiter</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span>
<span class="n">fieldDelimiter</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="sc">&#39;,&#39;</span><span class="o">,</span>
<span class="n">writeMode</span><span class="k">:</span> <span class="kt">WriteMode</span> <span class="o">=</span> <span class="nc">WriteMode</span><span class="o">.</span><span class="nc">NO_OVERWRITE</span><span class="o">)</span>
<span class="k">def</span> <span class="n">write</span><span class="o">(</span><span class="n">outputFormat</span><span class="k">:</span> <span class="kt">FileOutputFormat</span><span class="o">[</span><span class="kt">T</span><span class="o">],</span>
<span class="n">path</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span>
<span class="n">writeMode</span><span class="k">:</span> <span class="kt">WriteMode</span> <span class="o">=</span> <span class="nc">WriteMode</span><span class="o">.</span><span class="nc">NO_OVERWRITE</span><span class="o">)</span>
<span class="k">def</span> <span class="n">printOnTaskManager</span><span class="o">()</span>
<span class="k">def</span> <span class="n">print</span><span class="o">()</span>
<span class="k">def</span> <span class="n">collect</span><span class="o">()</span></code></pre></div>
</div>
</div>
<p>The first two methods (<code>writeAsText()</code> and <code>writeAsCsv()</code>) do as the name suggests, the third one
can be used to specify a custom data output format. Please refer to <a href="#data-sinks">Data Sinks</a> for
more information on writing to files and also about custom data output formats.</p>
<p>The <code>print()</code> method is useful for developing/debugging. It will output the contents of the DataSet
to standard output (on the JVM starting the Flink execution). <strong>NOTE</strong> The behavior of the <code>print()</code>
method changed with Flink 0.9.x. Before it was printing to the log file of the workers, now its
sending the DataSet results to the client and printing the results there.</p>
<p><code>collect()</code> retrieve the DataSet from the cluster to the local JVM. The <code>collect()</code> method
will return a <code>List</code> containing the elements.</p>
<p>Both <code>print()</code> and <code>collect()</code> will trigger the execution of the program. You donā€™t need to further call <code>execute()</code>.</p>
<p><strong>NOTE</strong> <code>print()</code> and <code>collect()</code> retrieve the data from the cluster to the client. Currently,
the data sizes you can retrieve with <code>collect()</code> are limited due to our RPC system. It is not advised
to collect DataSets larger than 10MBs.</p>
<p>There is also a <code>printOnTaskManager()</code> method which will print the DataSet contents on the TaskManager
(so you have to get them from the log file). The <code>printOnTaskManager()</code> method will not trigger a
program execution.</p>
<p>Once you specified the complete program you need to <strong>trigger the program execution</strong>. You can call
<code>execute()</code> directly on the <code>ExecutionEnviroment</code> or you implicitly trigger the execution with
<code>collect()</code> or <code>print()</code>.
Depending on the type of the <code>ExecutionEnvironment</code> the execution will be triggered on your local
machine or submit your program for execution on a cluster.</p>
<p>Note that you can not call both <code>print()</code> (or <code>collect()</code>) and <code>execute()</code> at the end of program.</p>
<p>The <code>execute()</code> method is returning the <code>JobExecutionResult</code>, including execution times and
accumulator results. <code>print()</code> and <code>collect()</code> are not returning the result, but it can be
accessed from the <code>getLastJobExecutionResult()</code> method.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="dataset-abstraction">DataSet abstraction</h2>
<p>The batch processing APIs of Flink are centered around the <code>DataSet</code> abstraction. A <code>DataSet</code> is only
an abstract representation of a set of data that can contain duplicates.</p>
<p>Also note that Flink is not always physically creating (materializing) each DataSet at runtime. This
depends on the used runtime, the configuration and optimizer decisions.</p>
<p>The Flink runtime does not need to always materialize the DataSets because it is using a streaming runtime model.</p>
<p>DataSets are only materialized to avoid distributed deadlocks (at points where the data flow graph branches out and joins again later) or if the execution mode has explicitly been set to a batched execution.</p>
<p>When using Flink on Tez, all DataSets are materialized.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="lazy-evaluation">Lazy Evaluation</h2>
<p>All Flink programs are executed lazily: When the programā€™s main method is executed, the data loading
and transformations do not happen directly. Rather, each operation is created and added to the
programā€™s plan. The operations are actually executed when the execution is explicitly triggered by
an <code>execute()</code> call on the ExecutionEnvironment object. Also, <code>collect()</code> and <code>print()</code> will trigger
the job execution. Whether the program is executed locally or on a cluster depends
on the environment of the program.</p>
<p>The lazy evaluation lets you construct sophisticated programs that Flink executes as one
holistically planned unit.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="transformations">Transformations</h2>
<p>Data transformations transform one or more DataSets into a new DataSet. Programs can combine
multiple transformations into sophisticated assemblies.</p>
<p>This section gives a brief overview of the available transformations. The <a href="dataset_transformations.html">transformations
documentation</a> has a full description of all transformations with
examples.</p>
<div class="codetabs">
<div data-lang="java">
<p><br /></p>
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Map</strong></td>
<td>
<p>Takes one element and produces one element.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">value</span><span class="o">);</span> <span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>FlatMap</strong></td>
<td>
<p>Takes one element and produces zero, one, or more elements. </p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">s</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>MapPartition</strong></td>
<td>
<p>Transforms a parallel partition in a single function call. The function get the partition
as an `Iterable` stream and can produce an arbitrary number of result values. The number of
elements in each partition depends on the degree-of-parallelism and previous operations.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data</span><span class="o">.</span><span class="na">mapPartition</span><span class="o">(</span><span class="k">new</span> <span class="n">MapPartitionFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapPartition</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="kt">long</span> <span class="n">c</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
<span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
<span class="n">c</span><span class="o">++;</span>
<span class="o">}</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">c</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Filter</strong></td>
<td>
<p>Evaluates a boolean function for each element and retains those for which the function
returns true.<br />
<strong>IMPORTANT:</strong> The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption
can lead to incorrect results.
</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">value</span> <span class="o">&gt;</span> <span class="mi">1000</span><span class="o">;</span> <span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Reduce</strong></td>
<td>
<p>Combines a group of elements into a single element by repeatedly combining two elements
into one. Reduce may be applied on a full data set, or on a grouped data set.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="n">ReduceFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Integer</span> <span class="n">a</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">b</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">;</span> <span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>ReduceGroup</strong></td>
<td>
<p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a
full data set, or on a grouped data set.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data</span><span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">GroupReduceFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="kt">int</span> <span class="n">prefixSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
<span class="k">for</span> <span class="o">(</span><span class="n">Integer</span> <span class="n">i</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
<span class="n">prefixSum</span> <span class="o">+=</span> <span class="n">i</span><span class="o">;</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">prefixSum</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Aggregate</strong></td>
<td>
<p>Aggregates a group of values into a single value. Aggregation functions can be thought of
as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped
data set.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span><span class="o">.</span><span class="na">aggregate</span><span class="o">(</span><span class="n">SUM</span><span class="o">,</span> <span class="mi">0</span><span class="o">).</span><span class="na">and</span><span class="o">(</span><span class="n">MIN</span><span class="o">,</span> <span class="mi">2</span><span class="o">);</span></code></pre></div>
<p>You can also use short-hand syntax for minimum, maximum, and sum aggregations.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span><span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">andMin</span><span class="o">(</span><span class="mi">2</span><span class="o">);</span></code></pre></div>
</td>
</tr>
<td><strong>Join</strong></td>
<td>
Joins two data sets by creating all pairs of elements that are equal on their keys.
Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
FlatJoinFunction to turn the pair of elements into arbitararily many (including none)
elements. See <a href="#specifying-keys">keys</a> on how to define join keys.
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">result</span> <span class="o">=</span> <span class="n">input1</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">input2</span><span class="o">)</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="c1">// key of the first input (tuple field 0)</span>
<span class="o">.</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> <span class="c1">// key of the second input (tuple field 1)</span></code></pre></div>
You can specify the way that the runtime executes the join via <i>Join Hints</i>. The hints
describe whether the join happens through partitioning or broadcasting, and whether it uses
a sort-based or a hash-based algorithm. Please refer to the
<a href="dataset_transformations.html#join-algorithm-hints">Transformations Guide</a> for
a list of possible hints and an example.
If no hint is specified, the system will try to make an estimate of the input sizes and
pick a the best strategy according to those estimates.
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// This executes a join by broadcasting the first data set</span>
<span class="c1">// using a hash table for the broadcasted data</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">input1</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">input2</span><span class="o">,</span> <span class="n">JoinHint</span><span class="o">.</span><span class="na">BROADCAST_HASH_FIRST</span><span class="o">)</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span></code></pre></div>
Note that the join transformation works only for equi-joins. Other join types, for example outer-joins need to be expressed using CoGroup.
</td>
<tr>
<td><strong>CoGroup</strong></td>
<td>
<p>The two-dimensional variant of the reduce operation. Groups each input on one or more
fields and then joins the groups. The transformation function is called per pair of groups.
See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data1</span><span class="o">.</span><span class="na">coGroup</span><span class="o">(</span><span class="n">data2</span><span class="o">)</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="k">new</span> <span class="n">CoGroupFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">coGroup</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">in1</span><span class="o">,</span> <span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">in2</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(...);</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Cross</strong></td>
<td>
<p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of
elements. Optionally uses a CrossFunction to turn the pair of elements into a single
element</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">data1</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">data2</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">data1</span><span class="o">.</span><span class="na">cross</span><span class="o">(</span><span class="n">data2</span><span class="o">);</span></code></pre></div>
<p>Note: Cross is potentially a <b>very</b> compute-intensive operation which can challenge even large compute clusters! It is adviced to hint the system with the DataSet sizes by using <i>crossWithTiny()</i> and <i>crossWithHuge()</i>.</p>
</td>
</tr>
<tr>
<td><strong>Union</strong></td>
<td>
<p>Produces the union of two data sets. This operation happens implicitly if more than one
data set is used for a specific function input.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">data1</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">data2</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">data1</span><span class="o">.</span><span class="na">union</span><span class="o">(</span><span class="n">data2</span><span class="o">);</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Rebalance</strong></td>
<td>
<p>Evenly rebalances the parallel partitions of a data set to eliminate data skew. Only Map-like transformations may follow a rebalance transformation.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">rebalance</span><span class="o">()</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">Mapper</span><span class="o">());</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Hash-Partition</strong></td>
<td>
<p>Hash-partitions a data set on a given key. Keys can be specified as key-selector functions or field position keys.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">partitionByHash</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">mapPartition</span><span class="o">(</span><span class="k">new</span> <span class="nf">PartitionMapper</span><span class="o">());</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Custom Partitioning</strong></td>
<td>
<p>Manually specify a partitioning over the data.
<br />
<i>Note</i>: This method works only on single field keys.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">partitionCustom</span><span class="o">(</span><span class="n">Partitioner</span><span class="o">&lt;</span><span class="n">K</span><span class="o">&gt;</span> <span class="n">partitioner</span><span class="o">,</span> <span class="n">key</span><span class="o">)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Sort Partition</strong></td>
<td>
<p>Locally sorts all partitions of a data set on a specified field in a specified order.
Fields can be specified as tuple positions or field expressions.
Sorting on multiple fields is done by chaining sortPartition() calls.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">sortPartition</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">ASCENDING</span><span class="o">)</span>
<span class="o">.</span><span class="na">mapPartition</span><span class="o">(</span><span class="k">new</span> <span class="nf">PartitionMapper</span><span class="o">());</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>First-n</strong></td>
<td>
<p>Returns the first n (arbitrary) elements of a data set. First-n can be applied on a regular data set, a grouped data set, or a grouped-sorted data set. Grouping keys can be specified as key-selector functions or field position keys.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="c1">// regular data set</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">result1</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">first</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span>
<span class="c1">// grouped data set</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">result2</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">first</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span>
<span class="c1">// grouped-sorted data set</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">result3</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">sortGroup</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">ASCENDING</span><span class="o">)</span>
<span class="o">.</span><span class="na">first</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span></code></pre></div>
</td>
</tr>
</tbody>
</table>
<hr />
<p>The following transformations are available on data sets of Tuples:</p>
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Project</strong></td>
<td>
<p>Selects a subset of fields from the tuples</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span><span class="mi">0</span><span class="o">);</span></code></pre></div>
</td>
</tr>
</tbody>
</table>
</div>
<div data-lang="scala">
<p><br /></p>
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Map</strong></td>
<td>
<p>Takes one element and produces one element.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="n">x</span><span class="o">.</span><span class="n">toInt</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>FlatMap</strong></td>
<td>
<p>Takes one element and produces zero, one, or more elements. </p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="n">str</span> <span class="k">=&gt;</span> <span class="n">str</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>MapPartition</strong></td>
<td>
<p>Transforms a parallel partition in a single function call. The function get the partition
as an `Iterator` and can produce an arbitrary number of result values. The number of
elements in each partition depends on the degree-of-parallelism and previous operations.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">mapPartition</span> <span class="o">{</span> <span class="n">in</span> <span class="k">=&gt;</span> <span class="n">in</span> <span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Filter</strong></td>
<td>
<p>Evaluates a boolean function for each element and retains those for which the function
returns true.<br />
<strong>IMPORTANT:</strong> The system assumes that the function does not modify the element on which the predicate is applied.
Violating this assumption can lead to incorrect results.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">filter</span> <span class="o">{</span> <span class="k">_</span> <span class="o">&gt;</span> <span class="mi">1000</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Reduce</strong></td>
<td>
<p>Combines a group of elements into a single element by repeatedly combining two elements
into one. Reduce may be applied on a full data set, or on a grouped data set.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">reduce</span> <span class="o">{</span> <span class="k">_</span> <span class="o">+</span> <span class="k">_</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>ReduceGroup</strong></td>
<td>
<p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a
full data set, or on a grouped data set.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">reduceGroup</span> <span class="o">{</span> <span class="n">elements</span> <span class="k">=&gt;</span> <span class="n">elements</span><span class="o">.</span><span class="n">sum</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Aggregate</strong></td>
<td>
<p>Aggregates a group of values into a single value. Aggregation functions can be thought of
as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped
data set.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">output</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Doublr</span><span class="o">)]</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">aggregate</span><span class="o">(</span><span class="nc">SUM</span><span class="o">,</span> <span class="mi">0</span><span class="o">).</span><span class="n">aggregate</span><span class="o">(</span><span class="nc">MIN</span><span class="o">,</span> <span class="mi">2</span><span class="o">);</span></code></pre></div>
<p>You can also use short-hand syntax for minimum, maximum, and sum aggregations.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">output</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Doublr</span><span class="o">)]</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">min</span><span class="o">(</span><span class="mi">2</span><span class="o">)</span></code></pre></div>
</td>
</tr>
<td><strong>Join</strong></td>
<td>
Joins two data sets by creating all pairs of elements that are equal on their keys.
Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
FlatJoinFunction to turn the pair of elements into arbitararily many (including none)
elements. See <a href="#specifying-keys">keys</a> on how to define join keys.
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// In this case tuple fields are used as keys. &quot;0&quot; is the join field on the first tuple</span>
<span class="c1">// &quot;1&quot; is the join field on the second tuple.</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">input1</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">input2</span><span class="o">).</span><span class="n">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">equalTo</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span></code></pre></div>
You can specify the way that the runtime executes the join via <i>Join Hints</i>. The hints
describe whether the join happens through partitioning or broadcasting, and whether it uses
a sort-based or a hash-based algorithm. Please refer to the
<a href="dataset_transformations.html#join-algorithm-hints">Transformations Guide</a> for
a list of possible hints and an example.
If no hint is specified, the system will try to make an estimate of the input sizes and
pick a the best strategy according to those estimates.
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// This executes a join by broadcasting the first data set</span>
<span class="c1">// using a hash table for the broadcasted data</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">input1</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">input2</span><span class="o">,</span> <span class="nc">JoinHint</span><span class="o">.</span><span class="nc">BROADCAST_HASH_FIRST</span><span class="o">)</span>
<span class="o">.</span><span class="n">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">equalTo</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span></code></pre></div>
Note that the join transformation works only for equi-joins. Other join types, for example outer-joins need to be expressed using CoGroup.
</td>
<tr>
<td><strong>CoGroup</strong></td>
<td>
<p>The two-dimensional variant of the reduce operation. Groups each input on one or more
fields and then joins the groups. The transformation function is called per pair of groups.
See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data1</span><span class="o">.</span><span class="n">coGroup</span><span class="o">(</span><span class="n">data2</span><span class="o">).</span><span class="n">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">equalTo</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Cross</strong></td>
<td>
<p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of
elements. Optionally uses a CrossFunction to turn the pair of elements into a single
element</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">data1</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">data2</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">result</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="n">data1</span><span class="o">.</span><span class="n">cross</span><span class="o">(</span><span class="n">data2</span><span class="o">)</span></code></pre></div>
<p>Note: Cross is potentially a <b>very</b> compute-intensive operation which can challenge even large compute clusters! It is adviced to hint the system with the DataSet sizes by using <i>crossWithTiny()</i> and <i>crossWithHuge()</i>.</p>
</td>
</tr>
<tr>
<td><strong>Union</strong></td>
<td>
<p>Produces the union of two data sets.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">union</span><span class="o">(</span><span class="n">data2</span><span class="o">)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Rebalance</strong></td>
<td>
<p>Evenly rebalances the parallel partitions of a data set to eliminate data skew. Only Map-like transformations may follow a rebalance transformation.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">data1</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">result</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="n">data1</span><span class="o">.</span><span class="n">rebalance</span><span class="o">().</span><span class="n">map</span><span class="o">(...)</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Hash-Partition</strong></td>
<td>
<p>Hash-partitions a data set on a given key. Keys can be specified as key-selector functions, tuple positions
or case class fields.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">in</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">in</span><span class="o">.</span><span class="n">partitionByHash</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">mapPartition</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>Sort Partition</strong></td>
<td>
<p>Locally sorts all partitions of a data set on a specified field in a specified order.
Fields can be specified as tuple positions or field expressions.
Sorting on multiple fields is done by chaining sortPartition() calls.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">in</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">in</span><span class="o">.</span><span class="n">sortPartition</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">ASCENDING</span><span class="o">).</span><span class="n">mapPartition</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span></code></pre></div>
</td>
</tr>
<tr>
<td><strong>First-n</strong></td>
<td>
<p>Returns the first n (arbitrary) elements of a data set. First-n can be applied on a regular data set, a grouped data set, or a grouped-sorted data set. Grouping keys can be specified as key-selector functions,
tuple positions or case class fields.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">in</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="c1">// regular data set</span>
<span class="k">val</span> <span class="n">result1</span> <span class="k">=</span> <span class="n">in</span><span class="o">.</span><span class="n">first</span><span class="o">(</span><span class="mi">3</span><span class="o">)</span>
<span class="c1">// grouped data set</span>
<span class="k">val</span> <span class="n">result2</span> <span class="k">=</span> <span class="n">in</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">first</span><span class="o">(</span><span class="mi">3</span><span class="o">)</span>
<span class="c1">// grouped-sorted data set</span>
<span class="k">val</span> <span class="n">result3</span> <span class="k">=</span> <span class="n">in</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">sortGroup</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">ASCENDING</span><span class="o">).</span><span class="n">first</span><span class="o">(</span><span class="mi">3</span><span class="o">)</span></code></pre></div>
</td>
</tr>
</tbody>
</table>
</div>
</div>
<p>The <a href="#parallel-execution">parallelism</a> of a transformation can be defined by <code>setParallelism(int)</code> while
<code>name(String)</code> assigns a custom name to a transformation which is helpful for debugging. The same is
possible for <a href="#data-sources">Data Sources</a> and <a href="#data-sinks">Data Sinks</a>.</p>
<p><code>withParameters(Configuration)</code> passes Configuration objects, which can be accessed from the <code>open()</code> method inside the user function.</p>
<p><a href="#top">Back to Top</a></p>
<h2 id="specifying-keys">Specifying Keys</h2>
<p>Some transformations (join, coGroup) require that a key is defined on
its argument DataSets, and other transformations (Reduce, GroupReduce,
Aggregate) allow that the DataSet is grouped on a key before they are
applied.</p>
<p>A DataSet is grouped as</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;...&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;...&gt;</span> <span class="n">reduced</span> <span class="o">=</span> <span class="n">input</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="cm">/*define key here*/</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">);</span></code></pre></div>
<p>The data model of Flink is not based on key-value pairs. Therefore,
you do not need to physically pack the data set types into keys and
values. Keys are ā€œvirtualā€: they are defined as functions over the
actual data to guide the grouping operator.</p>
<h3 class="no_toc" id="define-keys-for-tuples">Define keys for Tuples</h3>
<p>The simplest case is grouping a data set of Tuples on one or more
fields of the Tuple:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">grouped</span> <span class="o">=</span> <span class="n">input</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">);</span></code></pre></div>
<p>The data set is grouped on the first field of the tuples (the one of
Integer type). The GroupReduce function will thus receive groups of tuples with
the same value in the first field.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">grouped</span> <span class="o">=</span> <span class="n">input</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span><span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">);</span></code></pre></div>
<p>The data set is grouped on the composite key consisting of the first and the
second field. Therefore, the GroupReduce function will receive groups
with the same value for both fields.</p>
<p>A note on nested Tuples: If you have a DataSet with a nested tuple, such as:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Float</span><span class="o">&gt;,</span><span class="n">String</span><span class="o">,</span><span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">ds</span><span class="o">;</span></code></pre></div>
<p>Specifying <code>groupBy(0)</code> will cause the system to use the full <code>Tuple2</code> as a key (with the Integer and Float being the key). If you want to ā€œnavigateā€ into the nested <code>Tuple2</code>, you have to use field expression keys which are explained below.</p>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Long</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">grouped</span> <span class="k">=</span> <span class="n">input</span>
<span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="n">reduceGroup</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">)</span></code></pre></div>
<p>The data set is grouped on the first field of the tuples (the one of
Integer type). The GroupReduce function will thus receive groups of tuples with
the same value in the first field.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Long</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">grouped</span> <span class="k">=</span> <span class="n">input</span>
<span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span><span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="n">reduce</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">)</span></code></pre></div>
<p>The data set is grouped on the composite key consisting of the first and the
second field. Therefore, the GroupReduce function will receive groups
with the same value for both fields.</p>
<p>A note on nested Tuples: If you have a DataSet with a nested tuple, such as:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">ds</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[((</span><span class="kt">Int</span>, <span class="kt">Float</span><span class="o">)</span>, <span class="kt">String</span>, <span class="kt">Long</span><span class="o">)]</span></code></pre></div>
<p>Specifying <code>groupBy(0)</code> will cause the system to use the full <code>Tuple2</code> as a key (with the Int and
Float being the key). If you want to ā€œnavigateā€ into the nested <code>Tuple2</code>, you have to use field expression keys which are explained below.</p>
</div>
</div>
<h3 class="no_toc" id="define-keys-using-field-expressions">Define keys using Field Expressions</h3>
<p>Starting from release 0.7-incubating, you can use String-based field expressions to reference nested fields and define keys for grouping, sorting, joining, or coGrouping. In addition, field expressions can be used to define <a href="#semantic-annotations">semantic function annotations</a>.</p>
<p>Field expressions make it very easy to select fields in (nested) composite types such as <a href="#tuples-and-case-classes">Tuple</a> and <a href="#pojos">POJO</a> types.</p>
<div class="codetabs">
<div data-lang="java">
<p>In the example below, we have a <code>WC</code> POJO with two fields ā€œwordā€ and ā€œcountā€. To group by the field <code>word</code>, we just pass its name to the <code>groupBy()</code> function.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// some ordinary POJO (Plain old Java Object)</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WC</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">String</span> <span class="n">word</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">int</span> <span class="n">count</span><span class="o">;</span>
<span class="o">}</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;word&quot;</span><span class="o">).</span><span class="na">reduce</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">);</span></code></pre></div>
<p><strong>Field Expression Syntax</strong>:</p>
<ul>
<li>
<p>Select POJO fields by their field name. For example <code>"user"</code> refers to the ā€œuserā€ field of a POJO type.</p>
</li>
<li>
<p>Select Tuple fields by their field name or 0-offset field index. For example <code>"f0"</code> and <code>"5"</code> refer to the first and sixth field of a Java Tuple type, respectively.</p>
</li>
<li>
<p>You can select nested fields in POJOs and Tuples. For example <code>"user.zip"</code> refers to the ā€œzipā€ field of a POJO which is stored in the ā€œuserā€ field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as <code>"f1.user.zip"</code> or <code>"user.f3.1.zip"</code>.</p>
</li>
<li>
<p>You can select the full type using the <code>"*"</code> wildcard expressions. This does also work for types which are not Tuple or POJO types.</p>
</li>
</ul>
<p><strong>Field Expression Example</strong>:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WC</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">ComplexNestedClass</span> <span class="n">complex</span><span class="o">;</span> <span class="c1">//nested POJO</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">count</span><span class="o">;</span>
<span class="c1">// getter / setter for private field (count)</span>
<span class="kd">public</span> <span class="kt">int</span> <span class="nf">getCount</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">count</span><span class="o">;</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">setCount</span><span class="o">(</span><span class="kt">int</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span>
<span class="k">this</span><span class="o">.</span><span class="na">count</span> <span class="o">=</span> <span class="n">c</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">ComplexNestedClass</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="n">someNumber</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">float</span> <span class="n">someFloat</span><span class="o">;</span>
<span class="kd">public</span> <span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">word</span><span class="o">;</span>
<span class="kd">public</span> <span class="n">IntWritable</span> <span class="n">hadoopCitizen</span><span class="o">;</span>
<span class="o">}</span></code></pre></div>
<p>These are valid field expressions for the example code above:</p>
<ul>
<li>
<p><code>"count"</code>: The count field in the <code>WC</code> class.</p>
</li>
<li>
<p><code>"complex"</code>: Recursively selects all fields of the field complex of POJO type <code>ComplexNestedClass</code>.</p>
</li>
<li>
<p><code>"complex.word.f2"</code>: Selects the last field of the nested <code>Tuple3</code>.</p>
</li>
<li>
<p><code>"complex.hadoopCitizen"</code>: Selects the Hadoop <code>IntWritable</code> type.</p>
</li>
</ul>
</div>
<div data-lang="scala">
<p>In the example below, we have a <code>WC</code> POJO with two fields ā€œwordā€ and ā€œcountā€. To group by the field <code>word</code>, we just pass its name to the <code>groupBy()</code> function.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// some ordinary POJO (Plain old Java Object)</span>
<span class="kd">class</span> <span class="nf">WC</span><span class="o">(</span><span class="n">var</span> <span class="nl">word:</span> <span class="n">String</span><span class="o">,</span> <span class="n">var</span> <span class="nl">count:</span> <span class="n">Int</span><span class="o">)</span> <span class="o">{</span>
<span class="n">def</span> <span class="nf">this</span><span class="o">()</span> <span class="o">{</span> <span class="k">this</span><span class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> <span class="mi">0L</span><span class="o">)</span> <span class="o">}</span>
<span class="o">}</span>
<span class="n">val</span> <span class="nl">words:</span> <span class="n">DataSet</span><span class="o">[</span><span class="n">WC</span><span class="o">]</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">val</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;word&quot;</span><span class="o">).</span><span class="na">reduce</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">)</span>
<span class="c1">// or, as a case class, which is less typing</span>
<span class="k">case</span> <span class="kd">class</span> <span class="nf">WC</span><span class="o">(</span><span class="nl">word:</span> <span class="n">String</span><span class="o">,</span> <span class="nl">count:</span> <span class="n">Int</span><span class="o">)</span>
<span class="n">val</span> <span class="nl">words:</span> <span class="n">DataSet</span><span class="o">[</span><span class="n">WC</span><span class="o">]</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">val</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;word&quot;</span><span class="o">).</span><span class="na">reduce</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">)</span></code></pre></div>
<p><strong>Field Expression Syntax</strong>:</p>
<ul>
<li>
<p>Select POJO fields by their field name. For example <code>"user"</code> refers to the ā€œuserā€ field of a POJO type.</p>
</li>
<li>
<p>Select Tuple fields by their 1-offset field name or 0-offset field index. For example <code>"_1"</code> and <code>"5"</code> refer to the first and sixth field of a Scala Tuple type, respectively.</p>
</li>
<li>
<p>You can select nested fields in POJOs and Tuples. For example <code>"user.zip"</code> refers to the ā€œzipā€ field of a POJO which is stored in the ā€œuserā€ field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as <code>"_2.user.zip"</code> or <code>"user._4.1.zip"</code>.</p>
</li>
<li>
<p>You can select the full type using the <code>"_"</code> wildcard expressions. This does also work for types which are not Tuple or POJO types.</p>
</li>
</ul>
<p><strong>Field Expression Example</strong>:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">WC</span><span class="o">(</span><span class="k">var</span> <span class="n">complex</span><span class="k">:</span> <span class="kt">ComplexNestedClass</span><span class="o">,</span> <span class="k">var</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="o">{</span>
<span class="k">def</span> <span class="k">this</span><span class="o">()</span> <span class="o">{</span> <span class="k">this</span><span class="o">(</span><span class="kc">null</span><span class="o">,</span> <span class="mi">0</span><span class="o">)</span> <span class="o">}</span>
<span class="o">}</span>
<span class="k">class</span> <span class="nc">ComplexNestedClass</span><span class="o">(</span>
<span class="k">var</span> <span class="n">someNumber</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span>
<span class="n">someFloat</span><span class="k">:</span> <span class="kt">Float</span><span class="o">,</span>
<span class="n">word</span><span class="k">:</span> <span class="o">(</span><span class="kt">Long</span><span class="o">,</span> <span class="kt">Long</span><span class="o">,</span> <span class="nc">String</span><span class="o">),</span>
<span class="n">hadoopCitizen</span><span class="k">:</span> <span class="kt">IntWritable</span><span class="o">)</span> <span class="o">{</span>
<span class="k">def</span> <span class="k">this</span><span class="o">()</span> <span class="o">{</span> <span class="k">this</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">0</span><span class="o">,</span> <span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">0</span><span class="o">,</span> <span class="s">&quot;&quot;</span><span class="o">),</span> <span class="k">new</span> <span class="nc">IntWritable</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>These are valid field expressions for the example code above:</p>
<ul>
<li>
<p><code>"count"</code>: The count field in the <code>WC</code> class.</p>
</li>
<li>
<p><code>"complex"</code>: Recursively selects all fields of the field complex of POJO type <code>ComplexNestedClass</code>.</p>
</li>
<li>
<p><code>"complex.word._3"</code>: Selects the last field of the nested <code>Tuple3</code>.</p>
</li>
<li>
<p><code>"complex.hadoopCitizen"</code>: Selects the Hadoop <code>IntWritable</code> type.</p>
</li>
</ul>
</div>
</div>
<h3 class="no_toc" id="define-keys-using-key-selector-functions">Define keys using Key Selector Functions</h3>
<p>An additional way to define keys are ā€œkey selectorā€ functions. A key selector function
takes a single dataset element as input and returns the key for the element. The key can be of any type and be derived from arbitrary computations.</p>
<p>The following example shows a key selector function that simply returns the field of an object:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// some ordinary POJO</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WC</span> <span class="o">{</span><span class="kd">public</span> <span class="n">String</span> <span class="n">word</span><span class="o">;</span> <span class="kd">public</span> <span class="kt">int</span> <span class="n">count</span><span class="o">;}</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span>
<span class="k">new</span> <span class="n">KeySelector</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">getKey</span><span class="o">(</span><span class="n">WC</span> <span class="n">wc</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">wc</span><span class="o">.</span><span class="na">word</span><span class="o">;</span> <span class="o">}</span>
<span class="o">})</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// some ordinary case class</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">WC</span><span class="o">(</span><span class="n">word</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
<span class="k">val</span> <span class="n">words</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">WC</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">wordCounts</span> <span class="k">=</span> <span class="n">words</span>
<span class="o">.</span><span class="n">groupBy</span><span class="o">(</span> <span class="k">_</span><span class="o">.</span><span class="n">word</span> <span class="o">).</span><span class="n">reduce</span><span class="o">(</span><span class="cm">/*do something*/</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p><a href="#top">Back to top</a></p>
<h2 id="passing-functions-to-flink">Passing Functions to Flink</h2>
<p>Operations require user-defined functions. This section lists several ways for doing this.</p>
<div class="codetabs">
<div data-lang="java">
<h4 id="implementing-an-interface">Implementing an interface</h4>
<p>The most basic way is to implement one of the provided interfaces:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">class</span> <span class="nc">MyMapFunction</span> <span class="kd">implements</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">value</span><span class="o">);</span> <span class="o">}</span>
<span class="o">});</span>
<span class="n">data</span><span class="o">.</span><span class="na">map</span> <span class="o">(</span><span class="k">new</span> <span class="nf">MyMapFunction</span><span class="o">());</span></code></pre></div>
<h4 id="anonymous-classes">Anonymous classes</h4>
<p>You can pass a function as an anonmymous class:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">value</span><span class="o">);</span> <span class="o">}</span>
<span class="o">});</span></code></pre></div>
<h4 id="java-8-lambdas">Java 8 Lambdas</h4>
<p>Flink also supports Java 8 Lambdas in the Java API. Please see the full <a href="java8.html">Java 8 Guide</a>.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">data</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">data</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">s</span> <span class="o">-&gt;</span> <span class="n">s</span><span class="o">.</span><span class="na">startsWith</span><span class="o">(</span><span class="s">&quot;http://&quot;</span><span class="o">));</span></code></pre></div>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">data</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">data</span><span class="o">.</span><span class="na">reduce</span><span class="o">((</span><span class="n">i1</span><span class="o">,</span><span class="n">i2</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">i1</span> <span class="o">+</span> <span class="n">i2</span><span class="o">);</span></code></pre></div>
<h4 id="rich-functions">Rich functions</h4>
<p>All transformations that take as argument a user-defined function can
instead take as argument a <em>rich</em> function. For example, instead of</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">class</span> <span class="nc">MyMapFunction</span> <span class="kd">implements</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">value</span><span class="o">);</span> <span class="o">}</span>
<span class="o">});</span></code></pre></div>
<p>you can write</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">class</span> <span class="nc">MyMapFunction</span> <span class="kd">extends</span> <span class="n">RichMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">value</span><span class="o">);</span> <span class="o">}</span>
<span class="o">});</span></code></pre></div>
<p>and pass the function as usual to a <code>map</code> transformation:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyMapFunction</span><span class="o">());</span></code></pre></div>
<p>Rich functions can also be defined as an anonymous class:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">data</span><span class="o">.</span><span class="na">map</span> <span class="o">(</span><span class="k">new</span> <span class="n">RichMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">value</span><span class="o">);</span> <span class="o">}</span>
<span class="o">});</span></code></pre></div>
</div>
<div data-lang="scala">
<h4 id="lambda-functions">Lambda Functions</h4>
<p>As already seen in previous examples all operations accept lambda functions for describing
the operation:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">data</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="n">data</span><span class="o">.</span><span class="n">filter</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">startsWith</span><span class="o">(</span><span class="s">&quot;http://&quot;</span><span class="o">)</span> <span class="o">}</span></code></pre></div>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">data</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="n">data</span><span class="o">.</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">(</span><span class="n">i1</span><span class="o">,</span><span class="n">i2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">i1</span> <span class="o">+</span> <span class="n">i2</span> <span class="o">}</span>
<span class="c1">// or</span>
<span class="n">data</span><span class="o">.</span><span class="n">reduce</span> <span class="o">{</span> <span class="k">_</span> <span class="o">+</span> <span class="k">_</span> <span class="o">}</span></code></pre></div>
<h4 id="rich-functions-1">Rich functions</h4>
<p>All transformations that take as argument a lambda function can
instead take as argument a <em>rich</em> function. For example, instead of</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="n">x</span><span class="o">.</span><span class="n">toInt</span> <span class="o">}</span></code></pre></div>
<p>you can write</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MyMapFunction</span> <span class="k">extends</span> <span class="nc">RichMapFunction</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">map</span><span class="o">(</span><span class="n">in</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span><span class="kt">Int</span> <span class="o">=</span> <span class="o">{</span> <span class="n">in</span><span class="o">.</span><span class="n">toInt</span> <span class="o">}</span>
<span class="o">})</span></code></pre></div>
<p>and pass the function to a <code>map</code> transformation:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">new</span> <span class="nc">MyMapFunction</span><span class="o">())</span></code></pre></div>
<p>Rich functions can also be defined as an anonymous class:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">data</span><span class="o">.</span><span class="n">map</span> <span class="o">(</span><span class="k">new</span> <span class="nc">RichMapFunction</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">map</span><span class="o">(</span><span class="n">in</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span><span class="kt">Int</span> <span class="o">=</span> <span class="o">{</span> <span class="n">in</span><span class="o">.</span><span class="n">toInt</span> <span class="o">}</span>
<span class="o">})</span></code></pre></div>
</div>
</div>
<p>Rich functions provide, in addition to the user-defined function (map,
reduce, etc), four methods: <code>open</code>, <code>close</code>, <code>getRuntimeContext</code>, and
<code>setRuntimeContext</code>. These are useful for parameterizing the function
(see <a href="#passing-parameters-to-functions">Passing Parameters to Functions</a>),
creating and finalizing local state, accessing broadcast variables (see
<a href="#broadcast-variables">Broadcast Variables</a>, and for accessing runtime
information such as accumulators and counters (see
<a href="#accumulators--counters">Accumulators and Counters</a>, and information
on iterations (see <a href="iterations.html">Iterations</a>).</p>
<p>In particular for the <code>reduceGroup</code> transformation, using a rich
function is the only way to define an optional <code>combine</code> function. See
the
<a href="dataset_transformations.html">transformations documentation</a>
for a complete example.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="data-types">Data Types</h2>
<p>Flink places some restrictions on the type of elements that are used in DataSets and as results
of transformations. The reason for this is that the system analyzes the types to determine
efficient execution strategies.</p>
<p>There are six different categories of data types:</p>
<ol>
<li><strong>Java Tuples</strong> and <strong>Scala Case Classes</strong></li>
<li><strong>Java POJOs</strong></li>
<li><strong>Primitive Types</strong></li>
<li><strong>Regular Classes</strong></li>
<li><strong>Values</strong></li>
<li><strong>Hadoop Writables</strong></li>
</ol>
<h4 id="tuples-and-case-classes">Tuples and Case Classes</h4>
<div class="codetabs">
<div data-lang="java">
<p>Tuples are composite types that contain a fixed number of fields with various types.
The Java API provides classes from <code>Tuple1</code> up to <code>Tuple25</code>. Every field of a tuple
can be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields of a
tuple can be accessed directly using the fieldā€™s name as <code>tuple.f4</code>, or using the generic getter method
<code>tuple.getField(int position)</code>. The field indicies start at 0. Note that this stands in contrast
to the Scala tuples, but it is more consistent with Javaā€™s general indexing.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span>
<span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="s">&quot;hello&quot;</span><span class="o">,</span> <span class="mi">1</span><span class="o">),</span>
<span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="s">&quot;world&quot;</span><span class="o">,</span> <span class="mi">2</span><span class="o">));</span>
<span class="n">wordCounts</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
<p>When grouping, sorting, or joining a data set of tuples, keys can be specified as field positions or field expressions. See the <a href="#specifying-keys">key definition section</a> or <a href="#transformations">data transformation section</a> for details.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">wordCounts</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="c1">// also valid .groupBy(&quot;f0&quot;)</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyReduceFunction</span><span class="o">());</span></code></pre></div>
</div>
<div data-lang="scala">
<p>Scala case classes (and Scala tuples which are a special case of case classes), are composite types that contain a fixed number of fields with various types. Tuple fields are addressed by their 1-offset names such as <code>_1</code> for the first field. Case class fields are accessed by their name.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">case</span> <span class="k">class</span> <span class="nc">WordCount</span><span class="o">(</span><span class="n">word</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
<span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span>
<span class="nc">WordCount</span><span class="o">(</span><span class="s">&quot;hello&quot;</span><span class="o">,</span> <span class="mi">1</span><span class="o">),</span>
<span class="nc">WordCount</span><span class="o">(</span><span class="s">&quot;world&quot;</span><span class="o">,</span> <span class="mi">2</span><span class="o">))</span> <span class="c1">// Case Class Data Set</span>
<span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;word&quot;</span><span class="o">).</span><span class="n">reduce</span><span class="o">(...)</span> <span class="c1">// group by field expression &quot;word&quot;</span>
<span class="k">val</span> <span class="n">input2</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">((</span><span class="s">&quot;hello&quot;</span><span class="o">,</span> <span class="mi">1</span><span class="o">),</span> <span class="o">(</span><span class="s">&quot;world&quot;</span><span class="o">,</span> <span class="mi">2</span><span class="o">))</span> <span class="c1">// Tuple2 Data Set</span>
<span class="n">input2</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">1</span><span class="o">).</span><span class="n">reduce</span><span class="o">(...)</span> <span class="c1">// group by field positions 0 and 1</span></code></pre></div>
<p>When grouping, sorting, or joining a data set of tuples, keys can be specified as field positions or field expressions. See the <a href="#specifying-keys">key definition section</a> or <a href="#transformations">data transformation section</a> for details.</p>
</div>
</div>
<h4 id="pojos">POJOs</h4>
<p>Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements:</p>
<ul>
<li>
<p>The class must be public.</p>
</li>
<li>
<p>It must have a public constructor without arguments (default constructor).</p>
</li>
<li>
<p>All fields are either public or must be accessible through getter and setter functions. For a field called <code>foo</code> the getter and setter methods must be named <code>getFoo()</code> and <code>setFoo()</code>.</p>
</li>
<li>
<p>The type of a field must be supported by Flink. At the moment, Flink uses <a href="http://avro.apache.org">Avro</a> to serialize arbitrary objects (such as <code>Date</code>).</p>
</li>
</ul>
<p>Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, they Flink can process POJOs more efficiently than general types.</p>
<p>The following example shows a simple POJO with two public fields.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordWithCount</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">String</span> <span class="n">word</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">int</span> <span class="n">count</span><span class="o">;</span>
<span class="kd">public</span> <span class="nf">WordCount</span><span class="o">()</span> <span class="o">{}</span>
<span class="kd">public</span> <span class="nf">WordCount</span><span class="o">(</span><span class="n">String</span> <span class="n">word</span><span class="o">,</span> <span class="kt">int</span> <span class="n">count</span><span class="o">)</span> <span class="o">{</span>
<span class="k">this</span><span class="o">.</span><span class="na">word</span> <span class="o">=</span> <span class="n">word</span><span class="o">;</span>
<span class="k">this</span><span class="o">.</span><span class="na">count</span> <span class="o">=</span> <span class="n">count</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">WordWithCount</span><span class="o">(</span><span class="k">var</span> <span class="n">word</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="k">var</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="o">{</span>
<span class="k">def</span> <span class="k">this</span><span class="o">()</span> <span class="o">{</span>
<span class="k">this</span><span class="o">(</span><span class="kc">null</span><span class="o">,</span> <span class="o">-</span><span class="mi">1</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<p>When grouping, sorting, or joining a data set of POJO types, keys can be specified with field expressions. See the <a href="#specifying-keys">key definition section</a> or <a href="#transformations">data transformation section</a> for details.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">wordCounts</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;word&quot;</span><span class="o">)</span> <span class="c1">// group by field expression &quot;word&quot;</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyReduceFunction</span><span class="o">());</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">wordCounts</span> <span class="n">groupBy</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">word</span> <span class="o">}</span> <span class="n">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nc">MyReduceFunction</span><span class="o">())</span></code></pre></div>
</div>
</div>
<h4 id="primitive-types">Primitive Types</h4>
<p>Flink supports all Java and Scala primitive types such as <code>Integer</code>, <code>String</code>, and <code>Double</code>.</p>
<h4 id="general-class-types">General Class Types</h4>
<p>Flink supports most Java and Scala classes (API and custom).
Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other native
resources. Classes that follow the Java Beans conventions work well in general.</p>
<p>All classes that are not identified as POJO types (see POJO requirements above) are handled by Flink as general class types.
Flink treats these data types as black boxes and is not able to access their their content (i.e., for efficient sorting). General types are de/serialized using the serialization framework <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>.</p>
<p>When grouping, sorting, or joining a data set of generic types, keys must be specified with key selector functions. See the <a href="#specifying-keys">key definition section</a> or <a href="#transformations">data transformation section</a> for details.</p>
<h4 id="values">Values</h4>
<p><em>Value</em> types describe their serialization and deserialization manually. Instead of going through a
general purpose serialization framework, they provide custom code for those operations by means of
implementing the <code>org.apache.flinktypes.Value</code> interface with the methods <code>read</code> and <code>write</code>. Using
a Value type is reasonable when general purpose serialization would be highly inefficient. An
example would be a data type that implements a sparse vector of elements as an array. Knowing that
the array is mostly zero, one can use a special encoding for the non-zero elements, while the
general purpose serialization would simply write all array elements.</p>
<p>The <code>org.apache.flinktypes.CopyableValue</code> interface supports manual internal cloning logic in a
similar way.</p>
<p>Flink comes with pre-defined Value types that correspond to basic data types. (<code>ByteValue</code>,
<code>ShortValue</code>, <code>IntValue</code>, <code>LongValue</code>, <code>FloatValue</code>, <code>DoubleValue</code>, <code>StringValue</code>, <code>CharValue</code>,
<code>BooleanValue</code>). These Value types act as mutable variants of the basic data types: Their value can
be altered, allowing programmers to reuse objects and take pressure off the garbage collector.</p>
<h4 id="hadoop-writables">Hadoop Writables</h4>
<p>You can use types that implement the <code>org.apache.hadoop.Writable</code> interface. The serialization logic
defined in the <code>write()</code>and <code>readFields()</code> methods will be used for serialization.</p>
<h4 id="type-erasure--type-inference">Type Erasure &amp; Type Inference</h4>
<p><em>Note: This Section is only relevant for Java.</em></p>
<p>The Java compiler throws away much of the generic type information after compilation. This is
known as <em>type erasure</em> in Java. It means that at runtime, an instance of an object does not know
its generic type any more. For example, instances of <code>DataSet&lt;String&gt;</code> and <code>DataSet&lt;Long&gt;</code> look the
same to the JVM.</p>
<p>Flink requires type information at the time when it prepares the program for execution (when the
main method of the program is called). The Flink Java API tries to reconstruct the type information
that was thrown away in various ways and store it explicitly in the data sets and operators. You can
retrieve the type via <code>DataSet.getType()</code>. The method returns an instance of <code>TypeInformation</code>,
which is Flinkā€™s internal way of representing types.</p>
<p>The type inference has its limits and needs the ā€œcooperationā€ of the programmer in some cases.
Examples for that are methods that create data sets from collections, such as
<code>ExecutionEnvironment.fromCollection(),</code> where you can pass an argument that describes the type. But
also generic functions like <code>MapFunction&lt;I, O&gt;</code> may need extra type information.</p>
<p>The
<a href="https://github.com/apache/flink/blob/master//flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java">ResultTypeQueryable</a>
interface can be implemented by input formats and functions to tell the API
explicitly about their return type. The <em>input types</em> that the functions are invoked with can
usually be inferred by the result types of the previous operations.</p>
<h4 id="object-reuse-behavior">Object reuse behavior</h4>
<p>Apache Flink is trying to reduce the number of object allocations for better performance.</p>
<p>By default, user defined functions (like <code>map()</code> or <code>groupReduce()</code>) are getting new objects on each call (or through an iterator). So it is possible to keep references to the objects inside the function (for example in a List).</p>
<p>User defined functions are often chained, for example when two mappers with the same parallelism are defined one after another. In the chaining case, the functions in the chain are receiving the same object instances. So the the second <code>map()</code> function is receiving the objects the first <code>map()</code> is returning.
This behavior can lead to errors when the first <code>map()</code> function keeps a list of all objects and the second mapper is modifying objects. In that case, the user has to manually create copies of the objects before putting them into the list.</p>
<p>Also note that the system assumes that the user is not modifying the incoming objects in the <code>filter()</code> function.</p>
<p>There is a switch at the <code>ExectionConfig</code> which allows users to enable the object reuse mode (<code>enableObjectReuse()</code>). For mutable types, Flink will reuse object instances. In practice that means that a <code>map()</code> function will always receive the same object instance (with its fields set to new values). The object reuse mode will lead to better performance because fewer objects are created, but the user has to manually take care of what they are doing with the object references.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="data-sources">Data Sources</h2>
<div class="codetabs">
<div data-lang="java">
<p>Data sources create the initial data sets, such as from files or from Java collections. The general
mechanism of creating data sets is abstracted behind an
<a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java">InputFormat</a>.
Flink comes
with several built-in formats to create data sets from common file formats. Many of them have
shortcut methods on the <em>ExecutionEnvironment</em>.</p>
<p>File-based:</p>
<ul>
<li>
<p><code>readTextFile(path)</code> / <code>TextInputFormat</code> - Reads files line wise and returns them as Strings.</p>
</li>
<li>
<p><code>readTextFileWithValue(path)</code> / <code>TextValueInputFormat</code> - Reads files line wise and returns them as
StringValues. StringValues are mutable strings.</p>
</li>
<li>
<p><code>readCsvFile(path)</code> / <code>CsvInputFormat</code> - Parses files of comma (or another char) delimited fields.
Returns a DataSet of tuples or POJOs. Supports the basic java types and their Value counterparts as field
types.</p>
</li>
<li>
<p><code>readFileOfPrimitives(path, Class)</code> / <code>PrimitiveInputFormat</code> - Parses files of new-line (or another char sequence) delimited primitive data types such as <code>String</code> or <code>Integer</code>.</p>
</li>
</ul>
<p>Collection-based:</p>
<ul>
<li>
<p><code>fromCollection(Collection)</code> - Creates a data set from the Java Java.util.Collection. All elements
in the collection must be of the same type.</p>
</li>
<li>
<p><code>fromCollection(Iterator, Class)</code> - Creates a data set from an iterator. The class specifies the
data type of the elements returned by the iterator.</p>
</li>
<li>
<p><code>fromElements(T ...)</code> - Creates a data set from the given sequence of objects. All objects must be
of the same type.</p>
</li>
<li>
<p><code>fromParallelCollection(SplittableIterator, Class)</code> - Creates a data set from an iterator, in
parallel. The class specifies the data type of the elements returned by the iterator.</p>
</li>
<li>
<p><code>generateSequence(from, to)</code> - Generates the squence of numbers in the given interval, in
parallel.</p>
</li>
</ul>
<p>Generic:</p>
<ul>
<li>
<p><code>readFile(inputFormat, path)</code> / <code>FileInputFormat</code> - Accepts a file input format.</p>
</li>
<li>
<p><code>createInput(inputFormat)</code> / <code>InputFormat</code> - Accepts a generic input format.</p>
</li>
</ul>
<p><strong>Examples</strong></p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// read text file from local files system</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">localLines</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readTextFile</span><span class="o">(</span><span class="s">&quot;file:///path/to/my/textfile&quot;</span><span class="o">);</span>
<span class="c1">// read text file from a HDFS running at nnHost:nnPort</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">hdfsLines</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readTextFile</span><span class="o">(</span><span class="s">&quot;hdfs://nnHost:nnPort/path/to/my/textfile&quot;</span><span class="o">);</span>
<span class="c1">// read a CSV file with three fields</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">csvInput</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readCsvFile</span><span class="o">(</span><span class="s">&quot;hdfs:///the/CSV/file&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">types</span><span class="o">(</span><span class="n">Integer</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">String</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Double</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="c1">// read a CSV file with five fields, taking only two of them</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">csvInput</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readCsvFile</span><span class="o">(</span><span class="s">&quot;hdfs:///the/CSV/file&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">includeFields</span><span class="o">(</span><span class="s">&quot;10010&quot;</span><span class="o">)</span> <span class="c1">// take the first and the fourth field</span>
<span class="o">.</span><span class="na">types</span><span class="o">(</span><span class="n">String</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Double</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="c1">// read a CSV file with three fields into a POJO (Person.class) with corresponding fields</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Person</span><span class="o">&gt;&gt;</span> <span class="n">csvInput</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readCsvFile</span><span class="o">(</span><span class="s">&quot;hdfs:///the/CSV/file&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">pojoType</span><span class="o">(</span><span class="n">Person</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="s">&quot;name&quot;</span><span class="o">,</span> <span class="s">&quot;age&quot;</span><span class="o">,</span> <span class="s">&quot;zipcode&quot;</span><span class="o">);</span>
<span class="c1">// create a set from some given elements</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">value</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span><span class="s">&quot;Foo&quot;</span><span class="o">,</span> <span class="s">&quot;bar&quot;</span><span class="o">,</span> <span class="s">&quot;foobar&quot;</span><span class="o">,</span> <span class="s">&quot;fubar&quot;</span><span class="o">);</span>
<span class="c1">// generate a number sequence</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">numbers</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">generateSequence</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">10000000</span><span class="o">);</span>
<span class="c1">// Read data from a relational database using the JDBC input format</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">dbData</span> <span class="o">=</span>
<span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span>
<span class="c1">// create and configure input format</span>
<span class="n">JDBCInputFormat</span><span class="o">.</span><span class="na">buildJDBCInputFormat</span><span class="o">()</span>
<span class="o">.</span><span class="na">setDrivername</span><span class="o">(</span><span class="s">&quot;org.apache.derby.jdbc.EmbeddedDriver&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">setDBUrl</span><span class="o">(</span><span class="s">&quot;jdbc:derby:memory:persons&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">setQuery</span><span class="o">(</span><span class="s">&quot;select name, age from persons&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">finish</span><span class="o">(),</span>
<span class="c1">// specify type information for DataSet</span>
<span class="k">new</span> <span class="nf">TupleTypeInfo</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">STRING_TYPE_INFO</span><span class="o">,</span> <span class="n">INT_TYPE_INFO</span><span class="o">)</span>
<span class="o">);</span>
<span class="c1">// Note: Flink&#39;s program compiler needs to infer the data types of the data items which are returned</span>
<span class="c1">// by an InputFormat. If this information cannot be automatically inferred, it is necessary to</span>
<span class="c1">// manually provide the type information as shown in the examples above.</span></code></pre></div>
<h4 id="configuring-csv-parsing">Configuring CSV Parsing</h4>
<p>Flink offers a number of configuration options for CSV parsing:</p>
<ul>
<li>
<p><code>types(Class ... types)</code> specifies the types of the fields to parse. <strong>It is mandatory to configure the types of the parsed fields.</strong>
In case of the type class Boolean.class, ā€œTrueā€ (case-insensitive), ā€œFalseā€ (case-insensitive), ā€œ1ā€ and ā€œ0ā€ are treated as booleans.</p>
</li>
<li>
<p><code>lineDelimiter(String del)</code> specifies the delimiter of individual records. The default line delimiter is the new-line character <code>'\n'</code>.</p>
</li>
<li>
<p><code>fieldDelimiter(String del)</code> specifies the delimiter that separates fields of a record. The default field delimiter is the comma character <code>','</code>.</p>
</li>
<li>
<p><code>includeFields(boolean ... flag)</code>, <code>includeFields(String mask)</code>, or <code>includeFields(long bitMask)</code> defines which fields to read from the input file (and which to ignore). By default the first <em>n</em> fields (as defined by the number of types in the <code>types()</code> call) are parsed.</p>
</li>
<li>
<p><code>parseQuotedStrings(char quoteChar)</code> enables quoted string parsing. Strings are parsed as quoted strings if the first character of the string field is the quote character (leading or tailing whitespaces are <em>not</em> trimmed). Field delimiters within quoted strings are ignored. Quoted string parsing fails if the last character of a quoted string field is not the quote character. If quoted string parsing is enabled and the first character of the field is <em>not</em> the quoting string, the string is parsed as unquoted string. By default, quoted string parsing is disabled.</p>
</li>
<li>
<p><code>ignoreComments(String commentPrefix)</code> specifies a comment prefix. All lines that start with the specified comment prefix are not parsed and ignored. By default, no lines are ignored.</p>
</li>
<li>
<p><code>ignoreInvalidLines()</code> enables lenient parsing, i.e., lines that cannot be correctly parsed are ignored. By default, lenient parsing is disabled and invalid lines raise an exception.</p>
</li>
<li>
<p><code>ignoreFirstLine()</code> configures the InputFormat to ignore the first line of the input file. By default no line is ignored.</p>
</li>
</ul>
<h4 id="recursive-traversal-of-the-input-path-directory">Recursive Traversal of the Input Path Directory</h4>
<p>For file-based inputs, when the input path is a directory, nested files are not enumerated by default. Instead, only the files inside the base directory are read, while nested files are ignored. Recursive enumeration of nested files can be enabled through the <code>recursive.file.enumeration</code> configuration parameter, like in the following example.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// enable recursive enumeration of nested input files</span>
<span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// create a configuration object</span>
<span class="n">Configuration</span> <span class="n">parameters</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Configuration</span><span class="o">();</span>
<span class="c1">// set the recursive enumeration parameter</span>
<span class="n">parameters</span><span class="o">.</span><span class="na">setBoolean</span><span class="o">(</span><span class="s">&quot;recursive.file.enumeration&quot;</span><span class="o">,</span> <span class="kc">true</span><span class="o">);</span>
<span class="c1">// pass the configuration to the data source</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">logs</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readTextFile</span><span class="o">(</span><span class="s">&quot;file:///path/with.nested/files&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">withParameters</span><span class="o">(</span><span class="n">parameters</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<p>Data sources create the initial data sets, such as from files or from Java collections. The general
mechanism of creating data sets is abstracted behind an
<a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java">InputFormat</a>.
Flink comes
with several built-in formats to create data sets from common file formats. Many of them have
shortcut methods on the <em>ExecutionEnvironment</em>.</p>
<p>File-based:</p>
<ul>
<li>
<p><code>readTextFile(path)</code> / <code>TextInputFormat</code> - Reads files line wise and returns them as Strings.</p>
</li>
<li>
<p><code>readTextFileWithValue(path)</code> / <code>TextValueInputFormat</code> - Reads files line wise and returns them as
StringValues. StringValues are mutable strings.</p>
</li>
<li>
<p><code>readCsvFile(path)</code> / <code>CsvInputFormat</code> - Parses files of comma (or another char) delimited fields.
Returns a DataSet of tuples, case class objects, or POJOs. Supports the basic java types and their Value counterparts as field
types.</p>
</li>
</ul>
<p>Collection-based:</p>
<ul>
<li>
<p><code>fromCollection(Seq)</code> - Creates a data set from a Seq. All elements
in the collection must be of the same type.</p>
</li>
<li>
<p><code>fromCollection(Iterator)</code> - Creates a data set from an Iterator. The class specifies the
data type of the elements returned by the iterator.</p>
</li>
<li>
<p><code>fromElements(elements: _*)</code> - Creates a data set from the given sequence of objects. All objects
must be of the same type.</p>
</li>
<li>
<p><code>fromParallelCollection(SplittableIterator)</code> - Creates a data set from an iterator, in
parallel. The class specifies the data type of the elements returned by the iterator.</p>
</li>
<li>
<p><code>generateSequence(from, to)</code> - Generates the squence of numbers in the given interval, in
parallel.</p>
</li>
</ul>
<p>Generic:</p>
<ul>
<li>
<p><code>readFile(inputFormat, path)</code> / <code>FileInputFormat</code> - Accepts a file input format.</p>
</li>
<li>
<p><code>createInput(inputFormat)</code> / <code>InputFormat</code> - Accepts a generic input format.</p>
</li>
</ul>
<p><strong>Examples</strong></p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="c1">// read text file from local files system</span>
<span class="k">val</span> <span class="n">localLines</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readTextFile</span><span class="o">(</span><span class="s">&quot;file:///path/to/my/textfile&quot;</span><span class="o">)</span>
<span class="c1">// read text file from a HDFS running at nnHost:nnPort</span>
<span class="k">val</span> <span class="n">hdfsLines</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readTextFile</span><span class="o">(</span><span class="s">&quot;hdfs://nnHost:nnPort/path/to/my/textfile&quot;</span><span class="o">)</span>
<span class="c1">// read a CSV file with three fields</span>
<span class="k">val</span> <span class="n">csvInput</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readCsvFile</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Double</span><span class="o">)](</span><span class="s">&quot;hdfs:///the/CSV/file&quot;</span><span class="o">)</span>
<span class="c1">// read a CSV file with five fields, taking only two of them</span>
<span class="k">val</span> <span class="n">csvInput</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readCsvFile</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Double</span><span class="o">)](</span>
<span class="s">&quot;hdfs:///the/CSV/file&quot;</span><span class="o">,</span>
<span class="n">includedFields</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">3</span><span class="o">))</span> <span class="c1">// take the first and the fourth field</span>
<span class="c1">// CSV input can also be used with Case Classes</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">MyCaseClass</span><span class="o">(</span><span class="n">str</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">dbl</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
<span class="k">val</span> <span class="n">csvInput</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readCsvFile</span><span class="o">[</span><span class="kt">MyCaseClass</span><span class="o">](</span>
<span class="s">&quot;hdfs:///the/CSV/file&quot;</span><span class="o">,</span>
<span class="n">includedFields</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">3</span><span class="o">))</span> <span class="c1">// take the first and the fourth field</span>
<span class="c1">// read a CSV file with three fields into a POJO (Person) with corresponding fields</span>
<span class="k">val</span> <span class="n">csvInput</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readCsvFile</span><span class="o">[</span><span class="kt">Person</span><span class="o">](</span>
<span class="s">&quot;hdfs:///the/CSV/file&quot;</span><span class="o">,</span>
<span class="n">pojoFields</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span><span class="s">&quot;name&quot;</span><span class="o">,</span> <span class="s">&quot;age&quot;</span><span class="o">,</span> <span class="s">&quot;zipcode&quot;</span><span class="o">))</span>
<span class="c1">// create a set from some given elements</span>
<span class="k">val</span> <span class="n">values</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span><span class="s">&quot;Foo&quot;</span><span class="o">,</span> <span class="s">&quot;bar&quot;</span><span class="o">,</span> <span class="s">&quot;foobar&quot;</span><span class="o">,</span> <span class="s">&quot;fubar&quot;</span><span class="o">)</span>
<span class="c1">// generate a number sequence</span>
<span class="k">val</span> <span class="n">numbers</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">generateSequence</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">10000000</span><span class="o">);</span></code></pre></div>
<h4 id="configuring-csv-parsing-1">Configuring CSV Parsing</h4>
<p>Flink offers a number of configuration options for CSV parsing:</p>
<ul>
<li>
<p><code>lineDelimiter: String</code> specifies the delimiter of individual records. The default line delimiter is the new-line character <code>'\n'</code>.</p>
</li>
<li>
<p><code>fieldDelimiter: String</code> specifies the delimiter that separates fields of a record. The default field delimiter is the comma character <code>','</code>.</p>
</li>
<li>
<p><code>includeFields: Array[Int]</code> defines which fields to read from the input file (and which to ignore). By default the first <em>n</em> fields (as defined by the number of types in the <code>types()</code> call) are parsed.</p>
</li>
<li>
<p><code>pojoFields: Array[String]</code> specifies the fields of a POJO that are mapped to CSV fields. Parsers for CSV fields are automatically initialized based on the type and order of the POJO fields.</p>
</li>
<li>
<p><code>parseQuotedStrings: Character</code> enables quoted string parsing. Strings are parsed as quoted strings if the first character of the string field is the quote character (leading or tailing whitespaces are <em>not</em> trimmed). Field delimiters within quoted strings are ignored. Quoted string parsing fails if the last character of a quoted string field is not the quote character. If quoted string parsing is enabled and the first character of the field is <em>not</em> the quoting string, the string is parsed as unquoted string. By default, quoted string parsing is disabled.</p>
</li>
<li>
<p><code>ignoreComments: String</code> specifies a comment prefix. All lines that start with the specified comment prefix are not parsed and ignored. By default, no lines are ignored.</p>
</li>
<li>
<p><code>lenient: Boolean</code> enables lenient parsing, i.e., lines that cannot be correctly parsed are ignored. By default, lenient parsing is disabled and invalid lines raise an exception.</p>
</li>
<li>
<p><code>ignoreFirstLine: Boolean</code> configures the InputFormat to ignore the first line of the input file. By default no line is ignored.</p>
</li>
</ul>
<h4 id="recursive-traversal-of-the-input-path-directory-1">Recursive Traversal of the Input Path Directory</h4>
<p>For file-based inputs, when the input path is a directory, nested files are not enumerated by default. Instead, only the files inside the base directory are read, while nested files are ignored. Recursive enumeration of nested files can be enabled through the <code>recursive.file.enumeration</code> configuration parameter, like in the following example.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// enable recursive enumeration of nested input files</span>
<span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="c1">// create a configuration object</span>
<span class="k">val</span> <span class="n">parameters</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Configuration</span>
<span class="c1">// set the recursive enumeration parameter</span>
<span class="n">parameters</span><span class="o">.</span><span class="n">setBoolean</span><span class="o">(</span><span class="s">&quot;recursive.file.enumeration&quot;</span><span class="o">,</span> <span class="kc">true</span><span class="o">)</span>
<span class="c1">// pass the configuration to the data source</span>
<span class="n">env</span><span class="o">.</span><span class="n">readTextFile</span><span class="o">(</span><span class="s">&quot;file:///path/with.nested/files&quot;</span><span class="o">).</span><span class="n">withParameters</span><span class="o">(</span><span class="n">parameters</span><span class="o">)</span></code></pre></div>
</div>
</div>
<h3 id="read-compressed-files">Read Compressed Files</h3>
<p>Flink currently supports transparent decompression of input files if these are marked with an appropriate file extension. In particular, this means that no further configuration of the input formats is necessary and any <code>FileInputFormat</code> support the compression, including custom input formats. Please notice that compressed files might not be read in parallel, thus impacting job scalability.</p>
<p>The following table lists the currently supported compression methods.</p>
<p><br /></p>
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Compression method</th>
<th class="text-left">File extensions</th>
<th class="text-left" style="width: 20%">Parallelizable</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>DEFLATE</strong></td>
<td>`.deflate`</td>
<td>no</td>
</tr>
<tr>
<td><strong>GZip</strong></td>
<td>`.gz`, `.gzip`</td>
<td>no</td>
</tr>
</tbody>
</table>
<p><a href="#top">Back to top</a></p>
<h2 id="execution-configuration">Execution Configuration</h2>
<p>The <code>ExecutionEnvironment</code> also contains the <code>ExecutionConfig</code> which allows to set job specific configuration values for the runtime.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">ExecutionConfig</span> <span class="n">executionConfig</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">();</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="k">var</span> <span class="n">executionConfig</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">getConfig</span></code></pre></div>
</div>
</div>
<p>The following configuration options are available: (the default is bold)</p>
<ul>
<li>
<p><strong><code>enableClosureCleaner()</code></strong> / <code>disableClosureCleaner()</code>. The closure cleaner is enabled by default. The closure cleaner removes unneeded references to the surrounding class of anonymous functions inside Flink programs.
With the closure cleaner disabled, it might happen that an anonymous user function is referencing the surrounding class, which is usually not Serializable. This will lead to exceptions by the serializer.</p>
</li>
<li>
<p><code>getParallelism()</code> / <code>setParallelism(int parallelism)</code> Set the default parallelism for the job.</p>
</li>
<li>
<p><code>getNumberOfExecutionRetries()</code> / <code>setNumberOfExecutionRetries(int numberOfExecutionRetries)</code> Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of <code>-1</code> indicates that the system default value (as defined in the configuration) should be used.</p>
</li>
<li>
<p><code>getExecutionMode()</code> / <code>setExecutionMode()</code>. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.</p>
</li>
<li>
<p><code>enableForceKryo()</code> / <strong><code>disableForceKryo</code></strong>. Kryo is not forced by default. Forces the GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them as a POJO. In some cases this might be preferable. For example, when Flinkā€™s internal serializers fail to handle a POJO properly.</p>
</li>
<li>
<p><code>enableForceAvro()</code> / <strong><code>disableForceAvro()</code></strong>. Avro is not forced by default. Forces the Flink AvroTypeInformation to use the Avro serializer instead of Kryo for serializing Avro POJOs.</p>
</li>
<li>
<p><code>enableObjectReuse()</code> / <strong><code>disableObjectReuse()</code></strong> By default, objects are not reused in Flink. Enabling the <a href="programming_guide.html#object-reuse-behavior">object reuse mode</a> will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.</p>
</li>
<li>
<p><strong><code>enableSysoutLogging()</code></strong> / <code>disableSysoutLogging()</code> JobManager status updates are printed to <code>System.out</code> by default. This setting allows to disable this behavior.</p>
</li>
<li>
<p><code>getGlobalJobParameters()</code> / <code>setGlobalJobParameters()</code> This method allows users to set custom objects as a global configuration for the job. Since the <code>ExecutionConfig</code> is accessible in all user defined functions, this is an easy method for making configuration globally available in a job.</p>
</li>
<li>
<p><code>addDefaultKryoSerializer(Class&lt;?&gt; type, Serializer&lt;?&gt; serializer)</code> Register a Kryo serializer instance for the given <code>type</code>.</p>
</li>
<li>
<p><code>addDefaultKryoSerializer(Class&lt;?&gt; type, Class&lt;? extends Serializer&lt;?&gt;&gt; serializerClass)</code> Register a Kryo serializer class for the given <code>type</code>.</p>
</li>
<li>
<p><code>registerTypeWithKryoSerializer(Class&lt;?&gt; type, Serializer&lt;?&gt; serializer)</code> Register the given type with Kryo and specify a serializer for it. By registering a type with Kryo, the serialization of the type will be much more efficient.</p>
</li>
<li>
<p><code>registerKryoType(Class&lt;?&gt; type)</code> If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags (integer IDs) are written. If a type is not registered with Kryo, its entire class-name will be serialized with every instance, leading to much higher I/O costs.</p>
</li>
<li>
<p><code>registerPojoType(Class&lt;?&gt; type)</code> Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written. If a type is not registered with Kryo, its entire class-name will be serialized with every instance, leading to much higher I/O costs.</p>
</li>
</ul>
<p>Note that types registered with <code>registerKryoType()</code> are not available to Flinkā€™s Kryo serializer instance.</p>
<ul>
<li><code>disableAutoTypeRegistration()</code> Automatic type registration is enabled by default. The automatic type registration is registering all types (including sub-types) used by usercode with Kryo and the POJO serializer.</li>
</ul>
<p>The <code>RuntimeContext</code> which is accessible in <code>Rich*</code> functions through the <code>getRuntimeContext()</code> method also allows to access the <code>ExecutionConfig</code> in all user defined functions.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="data-sinks">Data Sinks</h2>
<div class="codetabs">
<div data-lang="java">
<p>Data sinks consume DataSets and are used to store or return them. Data sink operations are described
using an
<a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java">OutputFormat</a>.
Flink comes with a variety of built-in output formats that are encapsulated behind operations on the
DataSet:</p>
<ul>
<li><code>writeAsText()</code> / <code>TextOuputFormat</code> - Writes elements line-wise as Strings. The Strings are
obtained by calling the <em>toString()</em> method of each element.</li>
<li><code>writeAsFormattedText()</code> / <code>TextOutputFormat</code> - Write elements line-wise as Strings. The Strings
are obtained by calling a user-defined <em>format()</em> method for each element.</li>
<li><code>writeAsCsv(...)</code> / <code>CsvOutputFormat</code> - Writes tuples as comma-separated value files. Row and field
delimiters are configurable. The value for each field comes from the <em>toString()</em> method of the objects.</li>
<li><code>print()</code> / <code>printToErr()</code> / <code>print(String msg)</code> / <code>printToErr(String msg)</code> - Prints the <em>toString()</em> value
of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is
prepended to the output. This can help to distinguish between different calls to <em>print</em>. If the parallelism is
greater than 1, the output will also be prepended with the identifier of the task which produced the output.</li>
<li><code>write()</code> / <code>FileOutputFormat</code> - Method and base class for custom file outputs. Supports
custom object-to-bytes conversion.</li>
<li><code>output()</code>/ <code>OutputFormat</code> - Most generic output method, for data sinks that are not file based
(such as storing the result in a database).</li>
</ul>
<p>A DataSet can be input to multiple operations. Programs can write or print a data set and at the
same time run additional transformations on them.</p>
<p><strong>Examples</strong></p>
<p>Standard data sink methods:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// text data </span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">textData</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="c1">// write DataSet to a file on the local file system</span>
<span class="n">textData</span><span class="o">.</span><span class="na">writeAsText</span><span class="o">(</span><span class="s">&quot;file:///my/result/on/localFS&quot;</span><span class="o">);</span>
<span class="c1">// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort</span>
<span class="n">textData</span><span class="o">.</span><span class="na">writeAsText</span><span class="o">(</span><span class="s">&quot;hdfs://nnHost:nnPort/my/result/on/localFS&quot;</span><span class="o">);</span>
<span class="c1">// write DataSet to a file and overwrite the file if it exists</span>
<span class="n">textData</span><span class="o">.</span><span class="na">writeAsText</span><span class="o">(</span><span class="s">&quot;file:///my/result/on/localFS&quot;</span><span class="o">,</span> <span class="n">WriteMode</span><span class="o">.</span><span class="na">OVERWRITE</span><span class="o">);</span>
<span class="c1">// tuples as lines with pipe as the separator &quot;a|b|c&quot;</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">values</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">values</span><span class="o">.</span><span class="na">writeAsCsv</span><span class="o">(</span><span class="s">&quot;file:///path/to/the/result/file&quot;</span><span class="o">,</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="s">&quot;|&quot;</span><span class="o">);</span>
<span class="c1">// this writes tuples in the text formatting &quot;(a, b, c)&quot;, rather than as CSV lines</span>
<span class="n">values</span><span class="o">.</span><span class="na">writeAsText</span><span class="o">(</span><span class="s">&quot;file:///path/to/the/result/file&quot;</span><span class="o">);</span>
<span class="c1">// this wites values as strings using a user-defined TextFormatter object</span>
<span class="n">values</span><span class="o">.</span><span class="na">writeAsFormattedText</span><span class="o">(</span><span class="s">&quot;file:///path/to/the/result/file&quot;</span><span class="o">,</span>
<span class="k">new</span> <span class="n">TextFormatter</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">format</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">f1</span> <span class="o">+</span> <span class="s">&quot; - &quot;</span> <span class="o">+</span> <span class="n">value</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
<p>Using a custom output format:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">myResult</span> <span class="o">=</span> <span class="o">[...]</span>
<span class="c1">// write Tuple DataSet to a relational database</span>
<span class="n">myResult</span><span class="o">.</span><span class="na">output</span><span class="o">(</span>
<span class="c1">// build and configure OutputFormat</span>
<span class="n">JDBCOutputFormat</span><span class="o">.</span><span class="na">buildJDBCOutputFormat</span><span class="o">()</span>
<span class="o">.</span><span class="na">setDrivername</span><span class="o">(</span><span class="s">&quot;org.apache.derby.jdbc.EmbeddedDriver&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">setDBUrl</span><span class="o">(</span><span class="s">&quot;jdbc:derby:memory:persons&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">setQuery</span><span class="o">(</span><span class="s">&quot;insert into persons (name, age, height) values (?,?,?)&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">finish</span><span class="o">()</span>
<span class="o">);</span></code></pre></div>
<h4 id="locally-sorted-output">Locally Sorted Output</h4>
<p>The output of a data sink can be locally sorted on specified fields in specified orders using <a href="#define-keys-for-tuples">tuple field positions</a> or <a href="#define-keys-using-field-expressions">field expressions</a>. This works for every output format.</p>
<p>The following examples show how to use this feature:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">tData</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">BookPojo</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">pData</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">sData</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="c1">// sort output on String field in ascending order</span>
<span class="n">tData</span><span class="o">.</span><span class="na">print</span><span class="o">().</span><span class="na">sortLocalOutput</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">ASCENDING</span><span class="o">);</span>
<span class="c1">// sort output on Double field in descending and Integer field in ascending order</span>
<span class="n">tData</span><span class="o">.</span><span class="na">print</span><span class="o">().</span><span class="na">sortLocalOutput</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">DESCENDING</span><span class="o">).</span><span class="na">sortLocalOutput</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">ASCENDING</span><span class="o">);</span>
<span class="c1">// sort output on the &quot;author&quot; field of nested BookPojo in descending order</span>
<span class="n">pData</span><span class="o">.</span><span class="na">writeAsText</span><span class="o">(...).</span><span class="na">sortLocalOutput</span><span class="o">(</span><span class="s">&quot;f0.author&quot;</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">DESCENDING</span><span class="o">);</span>
<span class="c1">// sort output on the full tuple in ascending order</span>
<span class="n">tData</span><span class="o">.</span><span class="na">writeAsCsv</span><span class="o">(...).</span><span class="na">sortLocalOutput</span><span class="o">(</span><span class="s">&quot;*&quot;</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">ASCENDING</span><span class="o">);</span>
<span class="c1">// sort atomic type (String) output in descending order</span>
<span class="n">sData</span><span class="o">.</span><span class="na">writeAsText</span><span class="o">(...).</span><span class="na">sortLocalOutput</span><span class="o">(</span><span class="s">&quot;*&quot;</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">DESCENDING</span><span class="o">);</span></code></pre></div>
<p>Globally sorted output is not supported yet.</p>
</div>
<div data-lang="scala">
<p>Data sinks consume DataSets and are used to store or return them. Data sink operations are described
using an
<a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java">OutputFormat</a>.
Flink comes with a variety of built-in output formats that are encapsulated behind operations on the
DataSet:</p>
<ul>
<li><code>writeAsText()</code> / <code>TextOuputFormat</code> - Writes elements line-wise as Strings. The Strings are
obtained by calling the <em>toString()</em> method of each element.</li>
<li><code>writeAsCsv(...)</code> / <code>CsvOutputFormat</code> - Writes tuples as comma-separated value files. Row and field
delimiters are configurable. The value for each field comes from the <em>toString()</em> method of the objects.</li>
<li><code>print()</code> / <code>printToErr()</code> - Prints the <em>toString()</em> value of each element on the
standard out / strandard error stream.</li>
<li><code>write()</code> / <code>FileOutputFormat</code> - Method and base class for custom file outputs. Supports
custom object-to-bytes conversion.</li>
<li><code>output()</code>/ <code>OutputFormat</code> - Most generic output method, for data sinks that are not file based
(such as storing the result in a database).</li>
</ul>
<p>A DataSet can be input to multiple operations. Programs can write or print a data set and at the
same time run additional transformations on them.</p>
<p><strong>Examples</strong></p>
<p>Standard data sink methods:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// text data </span>
<span class="k">val</span> <span class="n">textData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="c1">// write DataSet to a file on the local file system</span>
<span class="n">textData</span><span class="o">.</span><span class="n">writeAsText</span><span class="o">(</span><span class="s">&quot;file:///my/result/on/localFS&quot;</span><span class="o">)</span>
<span class="c1">// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort</span>
<span class="n">textData</span><span class="o">.</span><span class="n">writeAsText</span><span class="o">(</span><span class="s">&quot;hdfs://nnHost:nnPort/my/result/on/localFS&quot;</span><span class="o">)</span>
<span class="c1">// write DataSet to a file and overwrite the file if it exists</span>
<span class="n">textData</span><span class="o">.</span><span class="n">writeAsText</span><span class="o">(</span><span class="s">&quot;file:///my/result/on/localFS&quot;</span><span class="o">,</span> <span class="nc">WriteMode</span><span class="o">.</span><span class="nc">OVERWRITE</span><span class="o">)</span>
<span class="c1">// tuples as lines with pipe as the separator &quot;a|b|c&quot;</span>
<span class="k">val</span> <span class="n">values</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="n">values</span><span class="o">.</span><span class="n">writeAsCsv</span><span class="o">(</span><span class="s">&quot;file:///path/to/the/result/file&quot;</span><span class="o">,</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="s">&quot;|&quot;</span><span class="o">)</span>
<span class="c1">// this writes tuples in the text formatting &quot;(a, b, c)&quot;, rather than as CSV lines</span>
<span class="n">values</span><span class="o">.</span><span class="n">writeAsText</span><span class="o">(</span><span class="s">&quot;file:///path/to/the/result/file&quot;</span><span class="o">);</span>
<span class="c1">// this wites values as strings using a user-defined formatting</span>
<span class="n">values</span> <span class="n">map</span> <span class="o">{</span> <span class="n">tuple</span> <span class="k">=&gt;</span> <span class="n">tuple</span><span class="o">.</span><span class="n">_1</span> <span class="o">+</span> <span class="s">&quot; - &quot;</span> <span class="o">+</span> <span class="n">tuple</span><span class="o">.</span><span class="n">_2</span> <span class="o">}</span>
<span class="o">.</span><span class="n">writeAsText</span><span class="o">(</span><span class="s">&quot;file:///path/to/the/result/file&quot;</span><span class="o">)</span></code></pre></div>
<h4 id="locally-sorted-output-1">Locally Sorted Output</h4>
<p>The output of a data sink can be locally sorted on specified fields in specified orders using <a href="#define-keys-for-tuples">tuple field positions</a> or <a href="#define-keys-using-field-expressions">field expressions</a>. This works for every output format.</p>
<p>The following examples show how to use this feature:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">tData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">pData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">BookPojo</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">sData</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="c1">// sort output on String field in ascending order</span>
<span class="n">tData</span><span class="o">.</span><span class="n">print</span><span class="o">.</span><span class="n">sortLocalOutput</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">ASCENDING</span><span class="o">);</span>
<span class="c1">// sort output on Double field in descending and Int field in ascending order</span>
<span class="n">tData</span><span class="o">.</span><span class="n">print</span><span class="o">.</span><span class="n">sortLocalOutput</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">DESCENDING</span><span class="o">).</span><span class="n">sortLocalOutput</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">ASCENDING</span><span class="o">);</span>
<span class="c1">// sort output on the &quot;author&quot; field of nested BookPojo in descending order</span>
<span class="n">pData</span><span class="o">.</span><span class="n">writeAsText</span><span class="o">(...).</span><span class="n">sortLocalOutput</span><span class="o">(</span><span class="s">&quot;_1.author&quot;</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">DESCENDING</span><span class="o">);</span>
<span class="c1">// sort output on the full tuple in ascending order</span>
<span class="n">tData</span><span class="o">.</span><span class="n">writeAsCsv</span><span class="o">(...).</span><span class="n">sortLocalOutput</span><span class="o">(</span><span class="s">&quot;_&quot;</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">ASCENDING</span><span class="o">);</span>
<span class="c1">// sort atomic type (String) output in descending order</span>
<span class="n">sData</span><span class="o">.</span><span class="n">writeAsText</span><span class="o">(...).</span><span class="n">sortLocalOutput</span><span class="o">(</span><span class="s">&quot;_&quot;</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">DESCENDING</span><span class="o">);</span></code></pre></div>
<p>Globally sorted output is not supported yet.</p>
</div>
</div>
<p><a href="#top">Back to top</a></p>
<h2 id="debugging">Debugging</h2>
<p>Before running a data analysis program on a large data set in a distributed cluster, it is a good
idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis
programs is usually an incremental process of checking results, debugging, and improving.</p>
<p>Flink provides a few nice features to significantly ease the development process of data analysis
programs by supporting local debugging from within an IDE, injection of test data, and collection of
result data. This section give some hints how to ease the development of Flink programs.</p>
<h3 id="local-execution-environment">Local Execution Environment</h3>
<p>A <code>LocalEnvironment</code> starts a Flink system within the same JVM process it was created in. If you
start the LocalEnvironement from an IDE, you can set breakpoint in your code and easily debug your
program.</p>
<p>A LocalEnvironment is created and used as follows:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">createLocalEnvironment</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">lines</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readTextFile</span><span class="o">(</span><span class="n">pathToTextFile</span><span class="o">);</span>
<span class="c1">// build your program</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">();</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">createLocalEnvironment</span><span class="o">()</span>
<span class="k">val</span> <span class="n">lines</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readTextFile</span><span class="o">(</span><span class="n">pathToTextFile</span><span class="o">)</span>
<span class="c1">// build your program</span>
<span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">();</span></code></pre></div>
</div>
</div>
<h3 id="collection-data-sources-and-sinks">Collection Data Sources and Sinks</h3>
<p>Providing input for an analysis program and checking its output is cumbersome when done by creating
input files and reading output files. Flink features special data sources and sinks which are backed
by Java collections to ease testing. Once a program has been tested, the sources and sinks can be
easily replaced by sources and sinks that read from / write to external data stores such as HDFS.</p>
<p>Collection data sources can be used as follows:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">createLocalEnvironment</span><span class="o">();</span>
<span class="c1">// Create a DataSet from a list of elements</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">myInts</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span> <span class="mi">5</span><span class="o">);</span>
<span class="c1">// Create a DataSet from any Java collection</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">data</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">myTuples</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromCollection</span><span class="o">(</span><span class="n">data</span><span class="o">);</span>
<span class="c1">// Create a DataSet from an Iterator</span>
<span class="n">Iterator</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">longIt</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">myLongs</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromCollection</span><span class="o">(</span><span class="n">longIt</span><span class="o">,</span> <span class="n">Long</span><span class="o">.</span><span class="na">class</span><span class="o">);</span></code></pre></div>
<p>A collection data sink is specified as follows:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">myResult</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">outData</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;();</span>
<span class="n">myResult</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="k">new</span> <span class="nf">LocalCollectionOutputFormat</span><span class="o">(</span><span class="n">outData</span><span class="o">));</span></code></pre></div>
<p><strong>Note:</strong> Currently, the collection data sink is restricted to local execution, as a debugging tool.</p>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">createLocalEnvironment</span><span class="o">()</span>
<span class="c1">// Create a DataSet from a list of elements</span>
<span class="k">val</span> <span class="n">myInts</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span> <span class="mi">5</span><span class="o">)</span>
<span class="c1">// Create a DataSet from any Collection</span>
<span class="k">val</span> <span class="n">data</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">val</span> <span class="n">myTuples</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromCollection</span><span class="o">(</span><span class="n">data</span><span class="o">)</span>
<span class="c1">// Create a DataSet from an Iterator</span>
<span class="k">val</span> <span class="n">longIt</span><span class="k">:</span> <span class="kt">Iterator</span><span class="o">[</span><span class="kt">Long</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
<span class="k">val</span> <span class="n">myLongs</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromCollection</span><span class="o">(</span><span class="n">longIt</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p><strong>Note:</strong> Currently, the collection data source requires that data types and iterators implement
<code>Serializable</code>. Furthermore, collection data sources can not be executed in parallel (
parallelism = 1).</p>
<p><a href="#top">Back to top</a></p>
<h2 id="iteration-operators">Iteration Operators</h2>
<p>Iterations implement loops in Flink programs. The iteration operators encapsulate a part of the
program and execute it repeatedly, feeding back the result of one iteration (the partial solution)
into the next iteration. There are two types of iterations in Flink: <strong>BulkIteration</strong> and
<strong>DeltaIteration</strong>.</p>
<p>This section provides quick examples on how to use both operators. Check out the <a href="iterations.html">Introduction to
Iterations</a> page for a more detailed introduction.</p>
<div class="codetabs">
<div data-lang="java">
<h4 id="bulk-iterations">Bulk Iterations</h4>
<p>To create a BulkIteration call the <code>iterate(int)</code> method of the DataSet the iteration should start
at. This will return an <code>IterativeDataSet</code>, which can be transformed with the regular operators. The
single argument to the iterate call specifies the maximum number of iterations.</p>
<p>To specify the end of an iteration call the <code>closeWith(DataSet)</code> method on the <code>IterativeDataSet</code> to
specify which transformation should be fed back to the next iteration. You can optionally specify a
termination criterion with <code>closeWith(DataSet, DataSet)</code>, which evaluates the second DataSet and
terminates the iteration, if this DataSet is empty. If no termination criterion is specified, the
iteration terminates after the given maximum number iterations.</p>
<p>The following example iteratively estimates the number Pi. The goal is to count the number of random
points, which fall into the unit circle. In each iteration, a random point is picked. If this point
lies inside the unit circle, we increment the count. Pi is then estimated as the resulting count
divided by the number of iterations multiplied by 4.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// Create initial IterativeDataSet</span>
<span class="n">IterativeDataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">initial</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">iterate</span><span class="o">(</span><span class="mi">10000</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">iteration</span> <span class="o">=</span> <span class="n">initial</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">Integer</span> <span class="n">i</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="kt">double</span> <span class="n">x</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">random</span><span class="o">();</span>
<span class="kt">double</span> <span class="n">y</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">random</span><span class="o">();</span>
<span class="k">return</span> <span class="n">i</span> <span class="o">+</span> <span class="o">((</span><span class="n">x</span> <span class="o">*</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span> <span class="o">*</span> <span class="n">y</span> <span class="o">&lt;</span> <span class="mi">1</span><span class="o">)</span> <span class="o">?</span> <span class="mi">1</span> <span class="o">:</span> <span class="mi">0</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">});</span>
<span class="c1">// Iteratively transform the IterativeDataSet</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">count</span> <span class="o">=</span> <span class="n">initial</span><span class="o">.</span><span class="na">closeWith</span><span class="o">(</span><span class="n">iteration</span><span class="o">);</span>
<span class="n">count</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Double</span> <span class="nf">map</span><span class="o">(</span><span class="n">Integer</span> <span class="n">count</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">count</span> <span class="o">/</span> <span class="o">(</span><span class="kt">double</span><span class="o">)</span> <span class="mi">10000</span> <span class="o">*</span> <span class="mi">4</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}).</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Iterative Pi Example&quot;</span><span class="o">);</span></code></pre></div>
<p>You can also check out the
<a href="https://github.com/apache/flink/blob/master//flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java">K-Means example</a>,
which uses a BulkIteration to cluster a set of unlabeled points.</p>
<h4 id="delta-iterations">Delta Iterations</h4>
<p>Delta iterations exploit the fact that certain algorithms do not change every data point of the
solution in each iteration.</p>
<p>In addition to the partial solution that is fed back (called workset) in every iteration, delta
iterations maintain state across iterations (called solution set), which can be updated through
deltas. The result of the iterative computation is the state after the last iteration. Please refer
to the <a href="iterations.html">Introduction to Iterations</a> for an overview of the basic principle of delta
iterations.</p>
<p>Defining a DeltaIteration is similar to defining a BulkIteration. For delta iterations, two data
sets form the input to each iteration (workset and solution set), and two data sets are produced as
the result (new workset, solution set delta) in each iteration.</p>
<p>To create a DeltaIteration call the <code>iterateDelta(DataSet, int, int)</code> (or <code>iterateDelta(DataSet,
int, int[])</code> respectively). This method is called on the initial solution set. The arguments are the
initial delta set, the maximum number of iterations and the key positions. The returned
<code>DeltaIteration</code> object gives you access to the DataSets representing the workset and solution set
via the methods <code>iteration.getWorkset()</code> and <code>iteration.getSolutionSet()</code>.</p>
<p>Below is an example for the syntax of a delta iteration</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// read the initial data sets</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">initialSolutionSet</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">initialDeltaSet</span> <span class="o">=</span> <span class="c1">// [...]</span>
<span class="kt">int</span> <span class="n">maxIterations</span> <span class="o">=</span> <span class="mi">100</span><span class="o">;</span>
<span class="kt">int</span> <span class="n">keyPosition</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
<span class="n">DeltaIteration</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">iteration</span> <span class="o">=</span> <span class="n">initialSolutionSet</span>
<span class="o">.</span><span class="na">iterateDelta</span><span class="o">(</span><span class="n">initialDeltaSet</span><span class="o">,</span> <span class="n">maxIterations</span><span class="o">,</span> <span class="n">keyPosition</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">candidateUpdates</span> <span class="o">=</span> <span class="n">iteration</span><span class="o">.</span><span class="na">getWorkset</span><span class="o">()</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">ComputeCandidateChanges</span><span class="o">());</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">deltas</span> <span class="o">=</span> <span class="n">candidateUpdates</span>
<span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">iteration</span><span class="o">.</span><span class="na">getSolutionSet</span><span class="o">())</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="k">new</span> <span class="nf">CompareChangesToCurrent</span><span class="o">());</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">nextWorkset</span> <span class="o">=</span> <span class="n">deltas</span>
<span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="nf">FilterByThreshold</span><span class="o">());</span>
<span class="n">iteration</span><span class="o">.</span><span class="na">closeWith</span><span class="o">(</span><span class="n">deltas</span><span class="o">,</span> <span class="n">nextWorkset</span><span class="o">)</span>
<span class="o">.</span><span class="na">writeAsCsv</span><span class="o">(</span><span class="n">outputPath</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<h4 id="bulk-iterations-1">Bulk Iterations</h4>
<p>To create a BulkIteration call the <code>iterate(int)</code> method of the DataSet the iteration should start
at and also specify a step function. The step function gets the input DataSet for the current
iteration and must return a new DataSet. The parameter of the iterate call is the maximum number
of iterations after which to stop.</p>
<p>There is also the <code>iterateWithTermination(int)</code> function that accepts a step function that
returns two DataSets: The result of the iteration step and a termination criterion. The iterations
are stopped once the termination criterion DataSet is empty.</p>
<p>The following example iteratively estimates the number Pi. The goal is to count the number of random
points, which fall into the unit circle. In each iteration, a random point is picked. If this point
lies inside the unit circle, we increment the count. Pi is then estimated as the resulting count
divided by the number of iterations multiplied by 4.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span><span class="o">()</span>
<span class="c1">// Create initial DataSet</span>
<span class="k">val</span> <span class="n">initial</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="k">val</span> <span class="n">count</span> <span class="k">=</span> <span class="n">initial</span><span class="o">.</span><span class="n">iterate</span><span class="o">(</span><span class="mi">10000</span><span class="o">)</span> <span class="o">{</span> <span class="n">iterationInput</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">iterationInput</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">i</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="n">x</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">random</span><span class="o">()</span>
<span class="k">val</span> <span class="n">y</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">random</span><span class="o">()</span>
<span class="n">i</span> <span class="o">+</span> <span class="o">(</span><span class="k">if</span> <span class="o">(</span><span class="n">x</span> <span class="o">*</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span> <span class="o">*</span> <span class="n">y</span> <span class="o">&lt;</span> <span class="mi">1</span><span class="o">)</span> <span class="mi">1</span> <span class="k">else</span> <span class="mi">0</span><span class="o">)</span>
<span class="o">}</span>
<span class="n">result</span>
<span class="o">}</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">count</span> <span class="n">map</span> <span class="o">{</span> <span class="n">c</span> <span class="k">=&gt;</span> <span class="n">c</span> <span class="o">/</span> <span class="mf">10000.0</span> <span class="o">*</span> <span class="mi">4</span> <span class="o">}</span>
<span class="n">result</span><span class="o">.</span><span class="n">print</span><span class="o">()</span>
<span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="s">&quot;Iterative Pi Example&quot;</span><span class="o">);</span></code></pre></div>
<p>You can also check out the
<a href="https://github.com/apache/flink/blob/master//flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala">K-Means example</a>,
which uses a BulkIteration to cluster a set of unlabeled points.</p>
<h4 id="delta-iterations-1">Delta Iterations</h4>
<p>Delta iterations exploit the fact that certain algorithms do not change every data point of the
solution in each iteration.</p>
<p>In addition to the partial solution that is fed back (called workset) in every iteration, delta
iterations maintain state across iterations (called solution set), which can be updated through
deltas. The result of the iterative computation is the state after the last iteration. Please refer
to the <a href="iterations.html">Introduction to Iterations</a> for an overview of the basic principle of delta
iterations.</p>
<p>Defining a DeltaIteration is similar to defining a BulkIteration. For delta iterations, two data
sets form the input to each iteration (workset and solution set), and two data sets are produced as
the result (new workset, solution set delta) in each iteration.</p>
<p>To create a DeltaIteration call the <code>iterateDelta(initialWorkset, maxIterations, key)</code> on the
initial solution set. The step function takes two parameters: (solutionSet, workset), and must
return two values: (solutionSetDelta, newWorkset).</p>
<p>Below is an example for the syntax of a delta iteration</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// read the initial data sets</span>
<span class="k">val</span> <span class="n">initialSolutionSet</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Long</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">initialWorkset</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Long</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
<span class="k">val</span> <span class="n">maxIterations</span> <span class="k">=</span> <span class="mi">100</span>
<span class="k">val</span> <span class="n">keyPosition</span> <span class="k">=</span> <span class="mi">0</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">initialSolutionSet</span><span class="o">.</span><span class="n">iterateDelta</span><span class="o">(</span><span class="n">initialWorkset</span><span class="o">,</span> <span class="n">maxIterations</span><span class="o">,</span> <span class="nc">Array</span><span class="o">(</span><span class="n">keyPosition</span><span class="o">))</span> <span class="o">{</span>
<span class="o">(</span><span class="n">solution</span><span class="o">,</span> <span class="n">workset</span><span class="o">)</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="n">candidateUpdates</span> <span class="k">=</span> <span class="n">workset</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nc">ComputeCandidateChanges</span><span class="o">())</span>
<span class="k">val</span> <span class="n">deltas</span> <span class="k">=</span> <span class="n">candidateUpdates</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">solution</span><span class="o">).</span><span class="n">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">)(</span><span class="k">new</span> <span class="nc">CompareChangesToCurrent</span><span class="o">())</span>
<span class="k">val</span> <span class="n">nextWorkset</span> <span class="k">=</span> <span class="n">deltas</span><span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="k">new</span> <span class="nc">FilterByThreshold</span><span class="o">())</span>
<span class="o">(</span><span class="n">deltas</span><span class="o">,</span> <span class="n">nextWorkset</span><span class="o">)</span>
<span class="o">}</span>
<span class="n">result</span><span class="o">.</span><span class="n">writeAsCsv</span><span class="o">(</span><span class="n">outputPath</span><span class="o">)</span>
<span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">()</span></code></pre></div>
</div>
</div>
<p><a href="#top">Back to top</a></p>
<h2 id="semantic-annotations">Semantic Annotations</h2>
<p>Semantic annotations can be used to give Flink hints about the behavior of a function.
They tell the system which fields of a functionā€™s input the function reads and evaluates and
which fields it unmodified forwards from its input to its output.
Semantic annotations are a powerful means to speed up execution, because they
allow the system to reason about reusing sort orders or partitions across multiple operations. Using
semantic annotations may eventually save the program from unnecessary data shuffling or unnecessary
sorts and significantly improve the performance of a program.</p>
<p><strong>Note:</strong> The use of semantic annotations is optional. However, it is absolutely crucial to
be conservative when providing semantic annotations!
Incorrect semantic annotations will cause Flink to make incorrect assumptions about your program and
might eventually lead to incorrect results.
If the behavior of an operator is not clearly predictable, no annotation should be provided.
Please read the documentation carefully.</p>
<p>The following semantic annotations are currently supported.</p>
<h4 id="forwarded-fields-annotation">Forwarded Fields Annotation</h4>
<p>Forwarded fields information declares input fields which are unmodified forwarded by a function to the same position or to another position in the output.
This information is used by the optimizer to infer whether a data property such as sorting or
partitioning is preserved by a function.
For functions that operate on groups of input elements such as <code>GroupReduce</code>, <code>GroupCombine</code>, <code>CoGroup</code>, and <code>MapPartition</code>, all fields that are defined as forwarded fields must always be jointly forwarded from the same input element. The forwarded fields of each element that is emitted by a group-wise function may originate from a different element of the functionā€™s input group.</p>
<p>Field forward information is specified using <a href="#define-keys-using-field-expressions">field expressions</a>.
Fields that are forwarded to the same position in the output can be specified by their position.
The specified position must be valid for the input and output data type and have the same type.
For example the String <code>"f2"</code> declares that the third field of a Java input tuple is always equal to the third field in the output tuple.</p>
<p>Fields which are unmodified forwarded to another position in the output are declared by specifying the
source field in the input and the target field in the output as field expressions.
The String <code>"f0-&gt;f2"</code> denotes that the first field of the Java input tuple is
unchanged copied to the third field of the Java output tuple. The wildcard expression <code>*</code> can be used to refer to a whole input or output type, i.e., <code>"f0-&gt;*"</code> denotes that the output of a function is always equal to the first field of its Java input tuple.</p>
<p>Multiple forwarded fields can be declared in a single String by separating them with semicolons as <code>"f0; f2-&gt;f1; f3-&gt;f2"</code> or in separate Strings <code>"f0", "f2-&gt;f1", "f3-&gt;f2"</code>. When specifying forwarded fields it is not required that all forwarded fields are declared, but all declarations must be correct.</p>
<p>Forwarded field information can be declared by attaching Java annotations on function class definitions or
by passing them as operator arguments after invoking a function on a DataSet as shown below.</p>
<h5 id="function-class-annotations">Function Class Annotations</h5>
<ul>
<li><code>@ForwardedFields</code> for single input functions such as Map and Reduce.</li>
<li><code>@ForwardedFieldsFirst</code> for the first input of a functions with two inputs such as Join and CoGroup.</li>
<li><code>@ForwardedFieldsSecond</code> for the second input of a functions with two inputs such as Join and CoGroup.</li>
</ul>
<h5 id="operator-arguments">Operator Arguments</h5>
<ul>
<li><code>data.map(myMapFnc).withForwardedFields()</code> for single input function such as Map and Reduce.</li>
<li><code>data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst()</code> for the first input of a function with two inputs such as Join and CoGroup.</li>
<li><code>data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond()</code> for the second input of a function with two inputs such as Join and CoGroup.</li>
</ul>
<p>Please note that it is not possible to overwrite field forward information which was specified as a class annotation by operator arguments.</p>
<h5 id="example">Example</h5>
<p>The following example shows how to declare forwarded field information using a function class annotation:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="nd">@ForwardedFields</span><span class="o">(</span><span class="s">&quot;f0-&gt;f2&quot;</span><span class="o">)</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyMap</span> <span class="kd">implements</span>
<span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">val</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="s">&quot;foo&quot;</span><span class="o">,</span> <span class="n">val</span><span class="o">.</span><span class="na">f1</span> <span class="o">/</span> <span class="mi">2</span><span class="o">,</span> <span class="n">val</span><span class="o">.</span><span class="na">f0</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nd">@ForwardedFields</span><span class="o">(</span><span class="s">&quot;_1-&gt;_3&quot;</span><span class="o">)</span>
<span class="k">class</span> <span class="nc">MyMap</span> <span class="k">extends</span> <span class="nc">MapFunction</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)</span>, <span class="o">(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]{</span>
<span class="k">def</span> <span class="n">map</span><span class="o">(</span><span class="n">value</span><span class="k">:</span> <span class="o">(</span><span class="kt">Int</span><span class="o">,</span> <span class="kt">Int</span><span class="o">))</span><span class="k">:</span> <span class="o">(</span><span class="kt">String</span><span class="o">,</span> <span class="kt">Int</span><span class="o">,</span> <span class="nc">Int</span><span class="o">)</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">return</span> <span class="o">(</span><span class="s">&quot;foo&quot;</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="n">_2</span> <span class="o">/</span> <span class="mi">2</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="n">_1</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<h4 id="non-forwarded-fields">Non-Forwarded Fields</h4>
<p>Non-forwarded fields information declares all fields which are not preserved on the same position in a functionā€™s output.
The values of all other fields are considered to be preserved at the same position in the output.
Hence, non-forwarded fields information is inverse to forwarded fields information.
Non-forwarded field information for group-wise operators such as <code>GroupReduce</code>, <code>GroupCombine</code>, <code>CoGroup</code>, and <code>MapPartition</code> must fulfill the same requirements as for forwarded field information.</p>
<p><strong>IMPORTANT</strong>: The specification of non-forwarded fields information is optional. However if used,
<strong>ALL!</strong> non-forwarded fields must be specified, because all other fields are considered to be forwarded in place. It is safe to declare a forwarded field as non-forwarded.</p>
<p>Non-forwarded fields are specified as a list of <a href="#define-keys-using-field-expressions">field expressions</a>. The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
For example both <code>"f1; f3"</code> and <code>"f1", "f3"</code> declare that the second and fourth field of a Java tuple
are not preserved in place and all other fields are preserved in place.
Non-forwarded field information can only be specified for functions which have identical input and output types.</p>
<p>Non-forwarded field information is specified as function class annotations using the following annotations:</p>
<ul>
<li><code>@NonForwardedFields</code> for single input functions such as Map and Reduce.</li>
<li><code>@NonForwardedFieldsFirst</code> for the first input of a function with two inputs such as Join and CoGroup.</li>
<li><code>@NonForwardedFieldsSecond</code> for the second input of a function with two inputs such as Join and CoGroup.</li>
</ul>
<h5 id="example-1">Example</h5>
<p>The following example shows how to declare non-forwarded field information:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="nd">@NonForwardedFields</span><span class="o">(</span><span class="s">&quot;f1&quot;</span><span class="o">)</span> <span class="c1">// second field is not forwarded</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyMap</span> <span class="kd">implements</span>
<span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">val</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">val</span><span class="o">.</span><span class="na">f0</span><span class="o">,</span> <span class="n">val</span><span class="o">.</span><span class="na">f1</span> <span class="o">/</span> <span class="mi">2</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nd">@NonForwardedFields</span><span class="o">(</span><span class="s">&quot;_2&quot;</span><span class="o">)</span> <span class="c1">// second field is not forwarded</span>
<span class="k">class</span> <span class="nc">MyMap</span> <span class="k">extends</span> <span class="nc">MapFunction</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)</span>, <span class="o">(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]{</span>
<span class="k">def</span> <span class="n">map</span><span class="o">(</span><span class="n">value</span><span class="k">:</span> <span class="o">(</span><span class="kt">Int</span><span class="o">,</span> <span class="kt">Int</span><span class="o">))</span><span class="k">:</span> <span class="o">(</span><span class="kt">Int</span><span class="o">,</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">return</span> <span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="n">_2</span> <span class="o">/</span> <span class="mi">2</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<h4 id="read-fields">Read Fields</h4>
<p>Read fields information declares all fields that are accessed and evaluated by a function, i.e.,
all fields that are used by the function to compute its result.
For example, fields which are evaluated in conditional statements or used for computations must be marked as read when specifying read fields information.
Fields which are only unmodified forwarded to the output without evaluating their values or fields which are not accessed at all are not considered to be read.</p>
<p><strong>IMPORTANT</strong>: The specification of read fields information is optional. However if used,
<strong>ALL!</strong> read fields must be specified. It is safe to declare a non-read field as read.</p>
<p>Read fields are specified as a list of <a href="#define-keys-using-field-expressions">field expressions</a>. The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
For example both <code>"f1; f3"</code> and <code>"f1", "f3"</code> declare that the second and fourth field of a Java tuple are read and evaluated by the function.</p>
<p>Read field information is specified as function class annotations using the following annotations:</p>
<ul>
<li><code>@ReadFields</code> for single input functions such as Map and Reduce.</li>
<li><code>@ReadFieldsFirst</code> for the first input of a function with two inputs such as Join and CoGroup.</li>
<li><code>@ReadFieldsSecond</code> for the second input of a function with two inputs such as Join and CoGroup.</li>
</ul>
<h5 id="example-2">Example</h5>
<p>The following example shows how to declare read field information:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="nd">@ReadFields</span><span class="o">(</span><span class="s">&quot;f0; f3&quot;</span><span class="o">)</span> <span class="c1">// f0 and f3 are read and evaluated by the function. </span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyMap</span> <span class="kd">implements</span>
<span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple4</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span>
<span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple4</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">val</span><span class="o">)</span> <span class="o">{</span>
<span class="k">if</span><span class="o">(</span><span class="n">val</span><span class="o">.</span><span class="na">f0</span> <span class="o">==</span> <span class="mi">42</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">val</span><span class="o">.</span><span class="na">f0</span><span class="o">,</span> <span class="n">val</span><span class="o">.</span><span class="na">f1</span><span class="o">);</span>
<span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">val</span><span class="o">.</span><span class="na">f3</span><span class="o">+</span><span class="mi">10</span><span class="o">,</span> <span class="n">val</span><span class="o">.</span><span class="na">f1</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nd">@ReadFields</span><span class="o">(</span><span class="s">&quot;_1; _4&quot;</span><span class="o">)</span> <span class="c1">// _1 and _4 are read and evaluated by the function.</span>
<span class="k">class</span> <span class="nc">MyMap</span> <span class="k">extends</span> <span class="nc">MapFunction</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span>, <span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)</span>, <span class="o">(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]{</span>
<span class="k">def</span> <span class="n">map</span><span class="o">(</span><span class="n">value</span><span class="k">:</span> <span class="o">(</span><span class="kt">Int</span><span class="o">,</span> <span class="kt">Int</span><span class="o">,</span> <span class="nc">Int</span><span class="o">,</span> <span class="nc">Int</span><span class="o">))</span><span class="k">:</span> <span class="o">(</span><span class="kt">Int</span><span class="o">,</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">if</span> <span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="n">_1</span> <span class="o">==</span> <span class="mi">42</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span>
<span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
<span class="k">return</span> <span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="n">_4</span> <span class="o">+</span> <span class="mi">10</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<p><a href="#top">Back to top</a></p>
<h2 id="broadcast-variables">Broadcast Variables</h2>
<p>Broadcast variables allow you to make a data set available to all parallel instances of an
operation, in addition to the regular input of the operation. This is useful for auxiliary data
sets, or data-dependent parameterization. The data set will then be accessible at the operator as a
Collection.</p>
<ul>
<li><strong>Broadcast</strong>: broadcast sets are registered by name via <code>withBroadcastSet(DataSet, String)</code>, and</li>
<li><strong>Access</strong>: accessible via <code>getRuntimeContext().getBroadcastVariable(String)</code> at the target operator.</li>
</ul>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// 1. The DataSet to be broadcasted</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">toBroadcast</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">data</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span><span class="s">&quot;a&quot;</span><span class="o">,</span> <span class="s">&quot;b&quot;</span><span class="o">);</span>
<span class="n">data</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">RichMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">parameters</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="c1">// 3. Access the broadcasted DataSet as a Collection</span>
<span class="n">Collection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">broadcastSet</span> <span class="o">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="na">getBroadcastVariable</span><span class="o">(</span><span class="s">&quot;broadcastSetName&quot;</span><span class="o">);</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="o">}).</span><span class="na">withBroadcastSet</span><span class="o">(</span><span class="n">toBroadcast</span><span class="o">,</span> <span class="s">&quot;broadcastSetName&quot;</span><span class="o">);</span> <span class="c1">// 2. Broadcast the DataSet</span></code></pre></div>
<p>Make sure that the names (<code>broadcastSetName</code> in the previous example) match when registering and
accessing broadcasted data sets. For a complete example program, have a look at
<a href="https://github.com/apache/flink/blob/master//flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java#L96">K-Means Algorithm</a>.</p>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// 1. The DataSet to be broadcasted</span>
<span class="k">val</span> <span class="n">toBroadcast</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">)</span>
<span class="k">val</span> <span class="n">data</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span><span class="s">&quot;a&quot;</span><span class="o">,</span> <span class="s">&quot;b&quot;</span><span class="o">)</span>
<span class="n">data</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">new</span> <span class="nc">RichMapFunction</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">]()</span> <span class="o">{</span>
<span class="k">var</span> <span class="n">broadcastSet</span><span class="k">:</span> <span class="kt">Traversable</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="kc">null</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">open</span><span class="o">(</span><span class="n">config</span><span class="k">:</span> <span class="kt">Configuration</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="c1">// 3. Access the broadcasted DataSet as a Collection</span>
<span class="n">broadcastSet</span> <span class="k">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="n">getBroadcastVariable</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;broadcastSetName&quot;</span><span class="o">).</span><span class="n">asScala</span>
<span class="o">}</span>
<span class="k">def</span> <span class="n">map</span><span class="o">(</span><span class="n">in</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="o">{</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="o">}).</span><span class="n">withBroadcastSet</span><span class="o">(</span><span class="n">toBroadcast</span><span class="o">,</span> <span class="s">&quot;broadcastSetName&quot;</span><span class="o">)</span> <span class="c1">// 2. Broadcast the DataSet</span></code></pre></div>
<p>Make sure that the names (<code>broadcastSetName</code> in the previous example) match when registering and
accessing broadcasted data sets. For a complete example program, have a look at
<a href="https://github.com/apache/flink/blob/master//flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala#L96">KMeans Algorithm</a>.</p>
</div>
</div>
<p><strong>Note</strong>: As the content of broadcast variables is kept in-memory on each node, it should not become
too large. For simpler things like scalar values you can simply make parameters part of the closure
of a function, or use the <code>withParameters(...)</code> method to pass in a configuration.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="passing-parameters-to-functions">Passing Parameters to Functions</h2>
<p>Parameters can be passed to functions using either the constructor or the <code>withParameters(Configuration)</code> method. The parameters are serialized as part of the function object and shipped to all parallel task instances.</p>
<p>Check also the <a href="best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application">best practices guide on how to pass command line arguments to functions</a>.</p>
<h4 id="via-constructor">Via Constructor</h4>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">toFilter</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">);</span>
<span class="n">toFilter</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyFilter</span><span class="o">(</span><span class="mi">2</span><span class="o">));</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">MyFilter</span> <span class="kd">implements</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kd">final</span> <span class="kt">int</span> <span class="n">limit</span><span class="o">;</span>
<span class="kd">public</span> <span class="nf">MyFilter</span><span class="o">(</span><span class="kt">int</span> <span class="n">limit</span><span class="o">)</span> <span class="o">{</span>
<span class="k">this</span><span class="o">.</span><span class="na">limit</span> <span class="o">=</span> <span class="n">limit</span><span class="o">;</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value</span> <span class="o">&gt;</span> <span class="n">limit</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">toFilter</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">)</span>
<span class="n">toFilter</span><span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="k">new</span> <span class="nc">MyFilter</span><span class="o">(</span><span class="mi">2</span><span class="o">))</span>
<span class="k">class</span> <span class="nc">MyFilter</span><span class="o">(</span><span class="n">limit</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">FilterFunction</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="o">{</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">filter</span><span class="o">(</span><span class="n">value</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span><span class="k">:</span> <span class="kt">Boolean</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">value</span> <span class="o">&gt;</span> <span class="n">limit</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<h4 id="via-withparametersconfiguration">Via <code>withParameters(Configuration)</code></h4>
<p>This method takes a Configuration object as an argument, which will be passed to the <a href="#rich-functions">rich function</a>ā€™s <code>open()</code>
method. The Configuration object is a Map from String keys to different value types.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">toFilter</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">);</span>
<span class="n">Configuration</span> <span class="n">config</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Configuration</span><span class="o">();</span>
<span class="n">config</span><span class="o">.</span><span class="na">setInteger</span><span class="o">(</span><span class="s">&quot;limit&quot;</span><span class="o">,</span> <span class="mi">2</span><span class="o">);</span>
<span class="n">toFilter</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="n">RichFilterFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">limit</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">parameters</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">limit</span> <span class="o">=</span> <span class="n">parameters</span><span class="o">.</span><span class="na">getInteger</span><span class="o">(</span><span class="s">&quot;limit&quot;</span><span class="o">,</span> <span class="mi">0</span><span class="o">);</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Integer</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">value</span> <span class="o">&gt;</span> <span class="n">limit</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}).</span><span class="na">withParameters</span><span class="o">(</span><span class="n">config</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">toFilter</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">fromElements</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">)</span>
<span class="k">val</span> <span class="n">c</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Configuration</span><span class="o">()</span>
<span class="n">c</span><span class="o">.</span><span class="n">setInteger</span><span class="o">(</span><span class="s">&quot;limit&quot;</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span>
<span class="n">toFilter</span><span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="k">new</span> <span class="nc">RichFilterFunction</span><span class="o">[</span><span class="kt">Int</span><span class="o">]()</span> <span class="o">{</span>
<span class="k">var</span> <span class="n">limit</span> <span class="k">=</span> <span class="mi">0</span>
<span class="k">override</span> <span class="k">def</span> <span class="n">open</span><span class="o">(</span><span class="n">config</span><span class="k">:</span> <span class="kt">Configuration</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">limit</span> <span class="k">=</span> <span class="n">config</span><span class="o">.</span><span class="n">getInteger</span><span class="o">(</span><span class="s">&quot;limit&quot;</span><span class="o">,</span> <span class="mi">0</span><span class="o">)</span>
<span class="o">}</span>
<span class="k">def</span> <span class="n">filter</span><span class="o">(</span><span class="n">in</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span><span class="k">:</span> <span class="kt">Boolean</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">in</span> <span class="o">&gt;</span> <span class="n">limit</span>
<span class="o">}</span>
<span class="o">}).</span><span class="n">withParameters</span><span class="o">(</span><span class="n">c</span><span class="o">)</span></code></pre></div>
</div>
</div>
<h4 id="globally-via-the-executionconfig">Globally via the <code>ExecutionConfig</code></h4>
<p>Flink also allows to pass custom configuration values to the <code>ExecutionConfig</code> interface of the environment. Since the execution config is accessible in all (rich) user functions, the custom configuration will be available globally in all functions.</p>
<p><strong>Setting a custom global configuration</strong></p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Configuration</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Configuration</span><span class="o">();</span>
<span class="n">conf</span><span class="o">.</span><span class="na">setString</span><span class="o">(</span><span class="s">&quot;mykey&quot;</span><span class="o">,</span><span class="s">&quot;myvalue&quot;</span><span class="o">);</span>
<span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">getConfig</span><span class="o">().</span><span class="na">setGlobalJobParameters</span><span class="o">(</span><span class="n">conf</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="k">val</span> <span class="n">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Configuration</span><span class="o">()</span>
<span class="n">conf</span><span class="o">.</span><span class="n">setString</span><span class="o">(</span><span class="s">&quot;mykey&quot;</span><span class="o">,</span> <span class="s">&quot;myvalue&quot;</span><span class="o">)</span>
<span class="n">env</span><span class="o">.</span><span class="n">getConfig</span><span class="o">.</span><span class="n">setGlobalJobParameters</span><span class="o">(</span><span class="n">conf</span><span class="o">)</span></code></pre></div>
</div>
</div>
<p>Please note that you can also pass a custom class extending the <code>ExecutionConfig.GlobalJobParameters</code> class as the global job parameters to the execution config. The interface allows to implement the <code>Map&lt;String, String&gt; toMap()</code> method which will in turn show the values from the configuration in the web frontend.</p>
<p><strong>Accessing values from the global configuration</strong></p>
<p>Objects in the global job parameters are accessible in many places in the system. All user functions implementing a <code>Rich*Function</code> interface have access through the runtime context.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">Tokenizer</span> <span class="kd">extends</span> <span class="n">RichFlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">String</span> <span class="n">mykey</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">parameters</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="kd">super</span><span class="o">.</span><span class="na">open</span><span class="o">(</span><span class="n">parameters</span><span class="o">);</span>
<span class="n">ExecutionConfig</span><span class="o">.</span><span class="na">GlobalJobParameters</span> <span class="n">globalParams</span> <span class="o">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="na">getExecutionConfig</span><span class="o">().</span><span class="na">getGlobalJobParameters</span><span class="o">();</span>
<span class="n">Configuration</span> <span class="n">globConf</span> <span class="o">=</span> <span class="o">(</span><span class="n">Configuration</span><span class="o">)</span> <span class="n">globalParams</span><span class="o">;</span>
<span class="n">mykey</span> <span class="o">=</span> <span class="n">globConf</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="s">&quot;mykey&quot;</span><span class="o">,</span> <span class="kc">null</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// ... more here ...</span></code></pre></div>
<p><a href="#top">Back to top</a></p>
<h2 id="program-packaging--distributed-execution">Program Packaging &amp; Distributed Execution</h2>
<p>As described in the <a href="#program-skeleton">program skeleton</a> section, Flink programs can be executed on
clusters by using the <code>RemoteEnvironment</code>. Alternatively, programs can be packaged into JAR Files
(Java Archives) for execution. Packaging the program is a prerequisite to executing them through the
<a href="cli.html">command line interface</a> or the <a href="web_client.html">web interface</a>.</p>
<h4 id="packaging-programs">Packaging Programs</h4>
<p>To support execution from a packaged JAR file via the command line or web interface, a program must
use the environment obtained by <code>ExecutionEnvironment.getExecutionEnvironment()</code>. This environment
will act as the clusterā€™s environment when the JAR is submitted to the command line or web
interface. If the Flink program is invoked differently than through these interfaces, the
environment will act like a local environment.</p>
<p>To package the program, simply export all involved classes as a JAR file. The JAR fileā€™s manifest
must point to the class that contains the programā€™s <em>entry point</em> (the class with the public
<code>main</code> method). The simplest way to do this is by putting the <em>main-class</em> entry into the
manifest (such as <code>main-class: org.apache.flinkexample.MyProgram</code>). The <em>main-class</em> attribute is
the same one that is used by the Java Virtual Machine to find the main method when executing a JAR
files through the command <code>java -jar pathToTheJarFile</code>. Most IDEs offer to include that attribute
automatically when exporting JAR files.</p>
<h4 id="packaging-programs-through-plans">Packaging Programs through Plans</h4>
<p>Additionally, we support packaging programs as <em>Plans</em>. Instead of defining a progam in the main
method and calling
<code>execute()</code> on the environment, plan packaging returns the <em>Program Plan</em>, which is a description of
the programā€™s data flow. To do that, the program must implement the
<code>org.apache.flink.api.common.Program</code> interface, defining the <code>getPlan(String...)</code> method. The
strings passed to that method are the command line arguments. The programā€™s plan can be created from
the environment via the <code>ExecutionEnvironment#createProgramPlan()</code> method. When packaging the
programā€™s plan, the JAR manifest must point to the class implementing the
<code>org.apache.flinkapi.common.Program</code> interface, instead of the class with the main method.</p>
<h4 id="summary">Summary</h4>
<p>The overall procedure to invoke a packaged program is as follows:</p>
<ol>
<li>
<p>The JARā€™s manifest is searched for a <em>main-class</em> or <em>program-class</em> attribute. If both
attributes are found, the <em>program-class</em> attribute takes precedence over the <em>main-class</em>
attribute. Both the command line and the web interface support a parameter to pass the entry point
class name manually for cases where the JAR manifest contains neither attribute.</p>
</li>
<li>
<p>If the entry point class implements the <code>org.apache.flinkapi.common.Program</code>, then the system
calls the <code>getPlan(String...)</code> method to obtain the program plan to execute. The
<code>getPlan(String...)</code> method was the only possible way of defining a program in the <em>Record API</em>
(see <a href="http://stratosphere.eu/docs/0.4/">0.4 docs</a>) and is also supported in the new Java API.</p>
</li>
<li>
<p>If the entry point class does not implement the <code>org.apache.flinkapi.common.Program</code> interface,
the system will invoke the main method of the class.</p>
</li>
</ol>
<p><a href="#top">Back to top</a></p>
<h2 id="accumulators--counters">Accumulators &amp; Counters</h2>
<p>Accumulators are simple constructs with an <strong>add operation</strong> and a <strong>final accumulated result</strong>,
which is available after the job ended.</p>
<p>The most straightforward accumulator is a <strong>counter</strong>: You can increment it using the
<code>Accumulator.add(V value)</code> method. At the end of the job Flink will sum up (merge) all partial
results and send the result to the client. Accumulators are useful during debugging or if you
quickly want to find out more about your data.</p>
<p>Flink currently has the following <strong>built-in accumulators</strong>. Each of them implements the
<a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java">Accumulator</a>
interface.</p>
<ul>
<li><a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java"><strong>IntCounter</strong></a>,
<a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java"><strong>LongCounter</strong></a>
and <a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java"><strong>DoubleCounter</strong></a>:
See below for an example using a counter.</li>
<li><a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java"><strong>Histogram</strong></a>:
A histogram implementation for a discrete number of bins. Internally it is just a map from Integer
to Integer. You can use this to compute distributions of values, e.g. the distribution of
words-per-line for a word count program.</li>
</ul>
<p><strong>How to use accumulators:</strong></p>
<p>First you have to create an accumulator object (here a counter) in the operator function where you
want to use it. Operator function here refers to the (anonymous inner) class implementing the user
defined code for an operator.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="n">IntCounter</span> <span class="n">numLines</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">IntCounter</span><span class="o">();</span></code></pre></div>
<p>Second you have to register the accumulator object, typically in the <code>open()</code> method of the
operator function. Here you also define the name.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">getRuntimeContext</span><span class="o">().</span><span class="na">addAccumulator</span><span class="o">(</span><span class="s">&quot;num-lines&quot;</span><span class="o">,</span> <span class="k">this</span><span class="o">.</span><span class="na">numLines</span><span class="o">);</span></code></pre></div>
<p>You can now use the accumulator anywhere in the operator function, including in the <code>open()</code> and
<code>close()</code> methods.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="k">this</span><span class="o">.</span><span class="na">numLines</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span></code></pre></div>
<p>The overall result will be stored in the <code>JobExecutionResult</code> object which is returned when
running a job using the Java API (currently this only works if the execution waits for the
completion of the job).</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">myJobExecutionResult</span><span class="o">.</span><span class="na">getAccumulatorResult</span><span class="o">(</span><span class="s">&quot;num-lines&quot;</span><span class="o">)</span></code></pre></div>
<p>All accumulators share a single namespace per job. Thus you can use the same accumulator in
different operator functions of your job. Flink will internally merge all accumulators with the same
name.</p>
<p>A note on accumulators and iterations: Currently the result of accumulators is only available after
the overall job ended. We plan to also make the result of the previous iteration available in the
next iteration. You can use
<a href="https://github.com/apache/flink/blob/master//flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98">Aggregators</a>
to compute per-iteration statistics and base the termination of iterations on such statistics.</p>
<p><strong>Custom accumulators:</strong></p>
<p>To implement your own accumulator you simply have to write your implementation of the Accumulator
interface. Feel free to create a pull request if you think your custom accumulator should be shipped
with Flink.</p>
<p>You have the choice to implement either
<a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java">Accumulator</a>
or <a href="https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java">SimpleAccumulator</a>.</p>
<p><code>Accumulator&lt;V,R&gt;</code> is most flexible: It defines a type <code>V</code> for the value to add, and a
result type <code>R</code> for the final result. E.g. for a histogram, <code>V</code> is a number and <code>R</code> i
a histogram. <code>SimpleAccumulator</code> is for the cases where both types are the same, e.g. for counters.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="parallel-execution">Parallel Execution</h2>
<p>This section describes how the parallel execution of programs can be configured in Flink. A Flink
program consists of multiple tasks (operators, data sources, and sinks). A task is split into
several parallel instances for execution and each parallel instance processes a subset of the taskā€™s
input data. The number of parallel instances of a task is called its <em>parallelism</em>.</p>
<p>The parallelism of a task can be specified in Flink on different levels.</p>
<h3 id="operator-level">Operator Level</h3>
<p>The parallelism of an individual operator, data source, or data sink can be defined by calling its
<code>setParallelism()</code> method. For example, the parallelism of the <code>Sum</code> operator in the
<a href="examples.html#word-count">WordCount</a> example program can be set to <code>5</code> as follows :</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="o">[...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">text</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">LineSplitter</span><span class="o">())</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="na">setParallelism</span><span class="o">(</span><span class="mi">5</span><span class="o">);</span>
<span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Word Count Example&quot;</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="k">val</span> <span class="n">text</span> <span class="k">=</span> <span class="o">[</span><span class="kt">...</span><span class="o">]</span>
<span class="k">val</span> <span class="n">wordCounts</span> <span class="k">=</span> <span class="n">text</span>
<span class="o">.</span><span class="n">flatMap</span><span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)</span> <span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span> <span class="o">}</span>
<span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">setParallelism</span><span class="o">(</span><span class="mi">5</span><span class="o">)</span>
<span class="n">wordCounts</span><span class="o">.</span><span class="n">print</span><span class="o">()</span>
<span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="s">&quot;Word Count Example&quot;</span><span class="o">)</span></code></pre></div>
</div>
</div>
<h3 id="execution-environment-level">Execution Environment Level</h3>
<p>Flink programs are executed in the context of an <a href="#program-skeleton">execution environment</a>. An
execution environment defines a default parallelism for all operators, data sources, and data sinks
it executes. Execution environment parallelism can be overwritten by explicitly configuring the
parallelism of an operator.</p>
<p>The default parallelism of an execution environment can be specified by calling the
<code>setParallelism()</code> method. To execute all operators, data sources, and data sinks of the
<a href="#example-program">WordCount</a> example program with a parallelism of <code>3</code>, set the default parallelism of the
execution environment as follows:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">setParallelism</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="o">[...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="o">[...]</span>
<span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Word Count Example&quot;</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="n">env</span><span class="o">.</span><span class="n">setParallelism</span><span class="o">(</span><span class="mi">3</span><span class="o">)</span>
<span class="k">val</span> <span class="n">text</span> <span class="k">=</span> <span class="o">[</span><span class="kt">...</span><span class="o">]</span>
<span class="k">val</span> <span class="n">wordCounts</span> <span class="k">=</span> <span class="n">text</span>
<span class="o">.</span><span class="n">flatMap</span><span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)</span> <span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span> <span class="o">}</span>
<span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="n">wordCounts</span><span class="o">.</span><span class="n">print</span><span class="o">()</span>
<span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="s">&quot;Word Count Example&quot;</span><span class="o">)</span></code></pre></div>
</div>
</div>
<h3 id="client-level">Client Level</h3>
<p>The parallelism can be set at the Client when submitting jobs to Flink. The
Client can either be a Java or a Scala program. One example of such a Client is
Flinkā€™s Command-line Interface (CLI).</p>
<p>For the CLI client, the parallelism parameter can be specified with <code>-p</code>. For
exampple:</p>
<div class="highlight"><pre><code>./bin/flink run -p 10 ../examples/*WordCount-java*.jar
</code></pre></div>
<p>In a Java/Scala program, the parallelism is set as follows:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="k">try</span> <span class="o">{</span>
<span class="n">PackagedProgram</span> <span class="n">program</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">PackagedProgram</span><span class="o">(</span><span class="n">file</span><span class="o">,</span> <span class="n">args</span><span class="o">);</span>
<span class="n">InetSocketAddress</span> <span class="n">jobManagerAddress</span> <span class="o">=</span> <span class="n">RemoteExecutor</span><span class="o">.</span><span class="na">getInetFromHostport</span><span class="o">(</span><span class="s">&quot;localhost:6123&quot;</span><span class="o">);</span>
<span class="n">Configuration</span> <span class="n">config</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Configuration</span><span class="o">();</span>
<span class="n">Client</span> <span class="n">client</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Client</span><span class="o">(</span><span class="n">jobManagerAddress</span><span class="o">,</span> <span class="n">config</span><span class="o">,</span> <span class="n">program</span><span class="o">.</span><span class="na">getUserCodeClassLoader</span><span class="o">());</span>
<span class="c1">// set the parallelism to 10 here</span>
<span class="n">client</span><span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="n">program</span><span class="o">,</span> <span class="mi">10</span><span class="o">,</span> <span class="kc">true</span><span class="o">);</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">ProgramInvocationException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
<span class="n">e</span><span class="o">.</span><span class="na">printStackTrace</span><span class="o">();</span>
<span class="o">}</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">try</span> <span class="o">{</span>
<span class="nc">PackagedProgram</span> <span class="n">program</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">PackagedProgram</span><span class="o">(</span><span class="n">file</span><span class="o">,</span> <span class="n">args</span><span class="o">)</span>
<span class="nc">InetSocketAddress</span> <span class="n">jobManagerAddress</span> <span class="k">=</span> <span class="nc">RemoteExecutor</span><span class="o">.</span><span class="n">getInetFromHostport</span><span class="o">(</span><span class="s">&quot;localhost:6123&quot;</span><span class="o">)</span>
<span class="nc">Configuration</span> <span class="n">config</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Configuration</span><span class="o">()</span>
<span class="nc">Client</span> <span class="n">client</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Client</span><span class="o">(</span><span class="n">jobManagerAddress</span><span class="o">,</span> <span class="k">new</span> <span class="nc">Configuration</span><span class="o">(),</span> <span class="n">program</span><span class="o">.</span><span class="n">getUserCodeClassLoader</span><span class="o">())</span>
<span class="c1">// set the parallelism to 10 here</span>
<span class="n">client</span><span class="o">.</span><span class="n">run</span><span class="o">(</span><span class="n">program</span><span class="o">,</span> <span class="mi">10</span><span class="o">,</span> <span class="kc">true</span><span class="o">)</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">{</span>
<span class="k">case</span> <span class="n">e</span><span class="k">:</span> <span class="kt">Exception</span> <span class="o">=&gt;</span> <span class="n">e</span><span class="o">.</span><span class="n">printStackTrace</span>
<span class="o">}</span></code></pre></div>
</div>
</div>
<h3 id="system-level">System Level</h3>
<p>A system-wide default parallelism for all execution environments can be defined by setting the
<code>parallelism.default</code> property in <code>./conf/flink-conf.yaml</code>. See the
<a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a> documentation for details.</p>
<p><a href="#top">Back to top</a></p>
<h2 id="execution-plans">Execution Plans</h2>
<p>Depending on various parameters such as data size or number of machines in the cluster, Flinkā€™s
optimizer automatically chooses an execution strategy for your program. In many cases, it can be
useful to know how exactly Flink will execute your program.</p>
<p><strong>Plan Visualization Tool</strong></p>
<p>Flink comes packaged with a visualization tool for execution plans. The HTML document containing
the visualizer is located under <code>tools/planVisualizer.html</code>. It takes a JSON representation of
the job execution plan and visualizes it as a graph with complete annotations of execution
strategies.</p>
<p>The following code shows how to print the execution plan JSON from your program:</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="o">...</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">env</span><span class="o">.</span><span class="na">getExecutionPlan</span><span class="o">());</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="o">...</span>
<span class="n">println</span><span class="o">(</span><span class="n">env</span><span class="o">.</span><span class="n">getExecutionPlan</span><span class="o">())</span></code></pre></div>
</div>
</div>
<p>To visualize the execution plan, do the following:</p>
<ol>
<li><strong>Open</strong> <code>planVisualizer.html</code> with your web browser,</li>
<li><strong>Paste</strong> the JSON string into the text field, and</li>
<li><strong>Press</strong> the draw button.</li>
</ol>
<p>After these steps, a detailed execution plan will be visualized.</p>
<p><img alt="A flink job execution graph." src="fig/plan_visualizer.png" width="80%" /></p>
<p><strong>Web Interface</strong></p>
<p>Flink offers a web interface for submitting and executing jobs. If you choose to use this interface to submit your packaged program, you have the option to also see the plan visualization.</p>
<p>The script to start the webinterface is located under <code>bin/start-webclient.sh</code>. After starting the webclient (per default on <strong>port 8080</strong>), your program can be uploaded and will be added to the list of available programs on the left side of the interface.</p>
<p>You are able to specify program arguments in the textbox at the bottom of the page. Checking the plan visualization checkbox shows the execution plan before executing the actual program.</p>
<p><a href="#top">Back to top</a></p>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/master/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>