blob: 23ba5bfd811114c8bbd48c0847627f077c07c64c [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.10-SNAPSHOT Documentation: Running Flink on YARN leveraging Tez</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/master/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/master/index.html">Overview<span class="hidden-sm hidden-xs"> 0.10</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/setup/building.html">Get Flink 0.10-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/master/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/master/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/master/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/master/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/master/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/master/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/master/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/master/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/master/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Running Flink on YARN leveraging Tez</h1>
<p><a href="#top"></a></p>
<p>You can run Flink using Tez as an execution environment. Flink on Tez
is currently included in <em>flink-staging</em> in alpha. All classes are
located in the <em>org.apache.flink.tez</em> package.</p>
<ul id="markdown-toc">
<li><a href="#why-flink-on-tez" id="markdown-toc-why-flink-on-tez">Why Flink on Tez</a></li>
<li><a href="#local-execution" id="markdown-toc-local-execution">Local execution</a></li>
<li><a href="#yarn-execution" id="markdown-toc-yarn-execution">YARN execution</a> <ul>
<li><a href="#setup" id="markdown-toc-setup">Setup</a></li>
<li><a href="#packaging-your-program" id="markdown-toc-packaging-your-program">Packaging your program</a></li>
</ul>
</li>
<li><a href="#how-it-works" id="markdown-toc-how-it-works">How it works</a></li>
<li><a href="#limitations" id="markdown-toc-limitations">Limitations</a></li>
</ul>
<h2 id="why-flink-on-tez">Why Flink on Tez</h2>
<p><a href="http://tez.apache.org">Apache Tez</a> is a scalable data processing
platform. Tez provides an API for specifying a directed acyclic
graph (DAG), and functionality for placing the DAG vertices in YARN
containers, as well as data shuffling. In Flink’s architecture,
Tez is at about the same level as Flink’s network stack. While Flink’s
network stack focuses heavily on low latency in order to support
pipelining, data streaming, and iterative algorithms, Tez
focuses on scalability and elastic resource usage.</p>
<p>Thus, by replacing Flink’s network stack with Tez, users can get scalability
and elastic resource usage in shared clusters while retaining Flink’s
APIs, optimizer, and runtime algorithms (local sorts, hash tables, etc).</p>
<p>Flink programs can run almost unmodified using Tez as an execution
environment. Tez supports local execution (e.g., for debugging), and
remote execution on YARN.</p>
<h2 id="local-execution">Local execution</h2>
<p>The <code>LocalTezEnvironment</code> can be used run programs using the local
mode provided by Tez. This example shows how WordCount can be run using the Tez local mode.
It is identical to a normal Flink WordCount, except that the <code>LocalTezEnvironment</code> is used.
To run in local Tez mode, you can simply run a Flink on Tez program
from your IDE (e.g., right click and run).</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCountExample</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="kd">final</span> <span class="n">LocalTezEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">LocalTezEnvironment</span><span class="o">.</span><span class="na">create</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span>
<span class="s">&quot;Who&#39;s there?&quot;</span><span class="o">,</span>
<span class="s">&quot;I think I hear them. Stand, ho! Who&#39;s there?&quot;</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">text</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">LineSplitter</span><span class="o">())</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Word Count Example&quot;</span><span class="o">);</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">LineSplitter</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">line</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<h2 id="yarn-execution">YARN execution</h2>
<h3 id="setup">Setup</h3>
<ul>
<li>
<p>Install Tez on your Hadoop 2 cluster following the instructions from the
<a href="http://tez.apache.org/install.html">Apache Tez website</a>. If you are able to run
the examples that ship with Tez, then Tez has been successfully installed.</p>
</li>
<li>
<p>Currently, you need to build Flink yourself to obtain Flink on Tez
(the reason is a Hadoop version compatibility: Tez releases artifacts
on Maven central with a Hadoop 2.6.0 dependency). Build Flink
using <code>mvn -DskipTests clean package -Pinclude-tez -Dhadoop.version=X.X.X -Dtez.version=X.X.X</code>.
Make sure that the Hadoop version matches the version that Tez uses.
Obtain the jar file contained in the Flink distribution under
<code>flink-staging/flink-tez/target/flink-tez-x.y.z-flink-fat-jar.jar</code>
and upload it to some directory in HDFS. E.g., to upload the file
to the directory <code>/apps</code>, execute</p>
</li>
</ul>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>hadoop fs -put /path/to/flink-tez-x.y.z-flink-fat-jar.jar /apps</code></pre></div>
<ul>
<li>Edit the tez-site.xml configuration file, adding an entry that points to the
location of the file. E.g., assuming that the file is in the directory <code>/apps/</code>,
add the following entry to tez-site.xml:</li>
</ul>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;property&gt;</span>
<span class="nt">&lt;name&gt;</span>tez.aux.uris<span class="nt">&lt;/name&gt;</span>
<span class="nt">&lt;value&gt;</span>${fs.default.name}/apps/flink-tez-x.y.z-flink-fat-jar.jar<span class="nt">&lt;/value&gt;</span>
<span class="nt">&lt;/property&gt;</span></code></pre></div>
<ul>
<li>At this point, you should be able to run the pre-packaged examples, e.g., run WordCount:</li>
</ul>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>hadoop jar /path/to/flink-tez-x.y.z-flink-fat-jar.jar wc hdfs:/path/to/text hdfs:/path/to/output</code></pre></div>
<h3 id="packaging-your-program">Packaging your program</h3>
<p>Application packaging is currently a bit different than in Flink standalone mode.
Flink programs that run on Tez need to be packaged in a “fat jar”
file that contain the Flink client. This jar can then be executed via the <code>hadoop jar</code> command.
An easy way to do that is to use the provided <code>flink-tez-quickstart</code> maven archetype.
Create a new project as</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>mvn archetype:generate <span class="se">\</span>
-DarchetypeGroupId<span class="o">=</span>org.apache.flink <span class="se">\</span>
-DarchetypeArtifactId<span class="o">=</span>flink-tez-quickstart <span class="se">\</span>
-DarchetypeVersion<span class="o">=</span>0.10-SNAPSHOT</code></pre></div>
<p>and specify the group id, artifact id, version, and package of your project. For example,
let us assume the following options: <code>org.myorganization</code>, <code>flink-on-tez</code>, <code>0.1</code>, and <code>org.myorganization</code>.
You should see the following output on your terminal:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>mvn archetype:generate -DarchetypeGroupId<span class="o">=</span>org.apache.flink -DarchetypeArtifactId<span class="o">=</span>flink-tez-quickstart
<span class="o">[</span>INFO<span class="o">]</span> Scanning <span class="k">for</span> projects...
<span class="o">[</span>INFO<span class="o">]</span>
<span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------
<span class="o">[</span>INFO<span class="o">]</span> Building Maven Stub Project <span class="o">(</span>No POM<span class="o">)</span> 1
<span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------
<span class="o">[</span>INFO<span class="o">]</span>
<span class="o">[</span>INFO<span class="o">]</span> &gt;&gt;&gt; maven-archetype-plugin:2.2:generate <span class="o">(</span>default-cli<span class="o">)</span> &gt; generate-sources @ standalone-pom &gt;&gt;&gt;
<span class="o">[</span>INFO<span class="o">]</span>
<span class="o">[</span>INFO<span class="o">]</span> <span class="o">&lt;&lt;&lt;</span> maven-archetype-plugin:2.2:generate <span class="o">(</span>default-cli<span class="o">)</span> &lt; generate-sources @ standalone-pom <span class="o">&lt;&lt;&lt;</span>
<span class="o">[</span>INFO<span class="o">]</span>
<span class="o">[</span>INFO<span class="o">]</span> --- maven-archetype-plugin:2.2:generate <span class="o">(</span>default-cli<span class="o">)</span> @ standalone-pom ---
<span class="o">[</span>INFO<span class="o">]</span> Generating project in Interactive mode
<span class="o">[</span>INFO<span class="o">]</span> Archetype <span class="o">[</span>org.apache.flink:flink-tez-quickstart:0.9-SNAPSHOT<span class="o">]</span> found in catalog <span class="nb">local</span>
<span class="nb"> </span>Define value <span class="k">for</span> property <span class="s1">&#39;groupId&#39;</span>: : org.myorganization
Define value <span class="k">for</span> property <span class="s1">&#39;artifactId&#39;</span>: : flink-on-tez
Define value <span class="k">for</span> property <span class="s1">&#39;version&#39;</span>: 1.0-SNAPSHOT: : 0.1
Define value <span class="k">for</span> property <span class="s1">&#39;package&#39;</span>: org.myorganization: :
Confirm properties configuration:
groupId: org.myorganization
artifactId: flink-on-tez
version: 0.1
package: org.myorganization
Y: : Y
<span class="o">[</span>INFO<span class="o">]</span> ----------------------------------------------------------------------------
<span class="o">[</span>INFO<span class="o">]</span> Using following parameters <span class="k">for</span> creating project from Archetype: flink-tez-quickstart:0.9-SNAPSHOT
<span class="o">[</span>INFO<span class="o">]</span> ----------------------------------------------------------------------------
<span class="o">[</span>INFO<span class="o">]</span> Parameter: groupId, Value: org.myorganization
<span class="o">[</span>INFO<span class="o">]</span> Parameter: artifactId, Value: flink-on-tez
<span class="o">[</span>INFO<span class="o">]</span> Parameter: version, Value: 0.1
<span class="o">[</span>INFO<span class="o">]</span> Parameter: package, Value: org.myorganization
<span class="o">[</span>INFO<span class="o">]</span> Parameter: packageInPathFormat, Value: org/myorganization
<span class="o">[</span>INFO<span class="o">]</span> Parameter: package, Value: org.myorganization
<span class="o">[</span>INFO<span class="o">]</span> Parameter: version, Value: 0.1
<span class="o">[</span>INFO<span class="o">]</span> Parameter: groupId, Value: org.myorganization
<span class="o">[</span>INFO<span class="o">]</span> Parameter: artifactId, Value: flink-on-tez
<span class="o">[</span>INFO<span class="o">]</span> project created from Archetype in dir: /Users/kostas/Dropbox/flink-tez-quickstart-test/flink-on-tez
<span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------
<span class="o">[</span>INFO<span class="o">]</span> BUILD SUCCESS
<span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------
<span class="o">[</span>INFO<span class="o">]</span> Total <span class="nb">time</span>: 44.130 s
<span class="o">[</span>INFO<span class="o">]</span> Finished at: 2015-02-26T17:59:45+01:00
<span class="o">[</span>INFO<span class="o">]</span> Final Memory: 15M/309M
<span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------</code></pre></div>
<p>The project contains an example called <code>YarnJob.java</code> that provides the skeleton
for a Flink-on-Tez job. Program execution is currently done using Hadoop’s <code>ProgramDriver</code>,
see the <code>Driver.java</code> class for an example. Create the fat jar using
<code>mvn -DskipTests clean package</code>. The resulting jar will be located in the <code>target/</code> directory.
You can now execute a job as follows:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>mvn -DskipTests clean package
<span class="nv">$ </span>hadoop jar flink-on-tez/target/flink-on-tez-0.1-flink-fat-jar.jar yarnjob <span class="o">[</span><span class="nb">command</span>-line parameters<span class="o">]</span></code></pre></div>
<p>Flink programs that run on YARN using Tez as an execution engine need to use the <code>RemoteTezEnvironment</code> and
register the class that contains the <code>main</code> method with that environment:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCountExample</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="kd">final</span> <span class="n">RemoteTezEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">RemoteTezEnvironment</span><span class="o">.</span><span class="na">create</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span>
<span class="s">&quot;Who&#39;s there?&quot;</span><span class="o">,</span>
<span class="s">&quot;I think I hear them. Stand, ho! Who&#39;s there?&quot;</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">text</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">LineSplitter</span><span class="o">())</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
<span class="n">env</span><span class="o">.</span><span class="na">registerMainClass</span><span class="o">(</span><span class="n">WordCountExample</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Word Count Example&quot;</span><span class="o">);</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">LineSplitter</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">line</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<h2 id="how-it-works">How it works</h2>
<p>Flink on Tez reuses the Flink APIs, the Flink optimizer,
and the Flink local runtime, including Flink’s hash table and sort implementations. Tez
replaces Flink’s network stack and control plan, and is responsible for scheduling and
network shuffles.</p>
<p>The figure below shows how a Flink program passes through the Flink stack and generates
a Tez DAG (instead of a JobGraph that would be created using normal Flink execution).</p>
<div style="text-align: center;">
<img src="fig/flink_on_tez_translation.png" alt="Translation of a Flink program to a Tez DAG." height="600px" vspace="20px" style="text-align: center;" />
</div>
<p>All local processing, including memory management, sorting, and hashing is performed by
Flink as usual. Local processing is encapsulated in Tez vertices, as seen in the figure
below. Tez vertices are connected by edges. Tez is currently based on a key-value data
model. In the current implementation, the elements that are processed by Flink operators
are wrapped inside Tez values, and the Tez key field is used to indicate the index of the target task
that the elements are destined to.</p>
<div style="text-align: center;">
<img src="fig/flink_tez_vertex.png" alt="Encapsulation of Flink runtime inside Tez vertices." height="200px" vspace="20px" style="text-align: center;" />
</div>
<h2 id="limitations">Limitations</h2>
<p>Currently, Flink on Tez does not support all features of the Flink API. We are working
to enable all of the missing features listed below. In the meantime, if your project depends on these features, we suggest
to use <a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">Flink on YARN</a> or <a href="http://flink.apache.org/docs/master/quickstart/setup_quickstart.html">Flink standalone</a>.</p>
<p>The following features are currently missing.</p>
<ul>
<li>
<p>Dedicated client: jobs need to be submitted via Hadoop’s command-line client</p>
</li>
<li>
<p>Self-joins: currently binary operators that receive the same input are not supported due to
<a href="https://issues.apache.org/jira/browse/TEZ-1190">TEZ-1190</a>.</p>
</li>
<li>
<p>Iterative programs are currently not supported.</p>
</li>
<li>
<p>Broadcast variables are currently not supported.</p>
</li>
<li>
<p>Accummulators and counters are currently not supported.</p>
</li>
<li>
<p>Performance: The current implementation has not been heavily tested for performance, and misses several optimizations,
including task chaining.</p>
</li>
<li>
<p>Streaming API: Streaming programs will not currently compile to Tez DAGs.</p>
</li>
<li>
<p>Scala API: The current implementation has only been tested with the Java API.</p>
</li>
</ul>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/master/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>