| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| <!DOCTYPE html> |
| |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> |
| |
| <title>Apache Flink 0.9.0 Documentation: Running Flink on YARN leveraging Tez</title> |
| |
| <link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon"> |
| |
| <!-- Bootstrap --> |
| <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css"> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| <body> |
| |
| |
| |
| |
| |
| |
| <!-- Top navbar. --> |
| <nav class="navbar navbar-default navbar-fixed-top"> |
| <div class="container"> |
| <!-- The logo. --> |
| <div class="navbar-header"> |
| <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <div class="navbar-logo"> |
| <a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a> |
| </div> |
| </div><!-- /.navbar-header --> |
| |
| <!-- The navigation links. --> |
| <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> |
| <ul class="nav navbar-nav"> |
| <li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li> |
| |
| <!-- Setup --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li> |
| |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Deployment</strong></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li> |
| |
| <li class="divider"></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li> |
| </ul> |
| </li> |
| |
| <!-- Programming Guides --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li> |
| |
| <li class="divider"></li> |
| <li><a href="scala_shell.html">Interactive Scala Shell</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li> |
| </ul> |
| </li> |
| |
| <!-- Libraries --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li> |
| </ul> |
| </li> |
| |
| <!-- Internals --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li> |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Internals</strong></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture & Process Model</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction & Serialization</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs & Scheduling</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li> |
| </ul> |
| </li> |
| </ul> |
| <form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html"> |
| <div class="form-group"> |
| <input type="text" class="form-control" name="q" placeholder="Search all pages"> |
| </div> |
| <button type="submit" class="btn btn-default">Search</button> |
| </form> |
| </div><!-- /.navbar-collapse --> |
| </div><!-- /.container --> |
| </nav> |
| |
| |
| |
| |
| <!-- Main content. --> |
| <div class="container"> |
| |
| |
| <div class="row"> |
| <div class="col-sm-10 col-sm-offset-1"> |
| <h1>Running Flink on YARN leveraging Tez</h1> |
| |
| |
| |
| <p><a href="#top"></a></p> |
| |
| <p>You can run Flink using Tez as an execution environment. Flink on Tez |
| is currently included in <em>flink-staging</em> in alpha. All classes are |
| located in the <em>org.apache.flink.tez</em> package.</p> |
| |
| <ul id="markdown-toc"> |
| <li><a href="#why-flink-on-tez" id="markdown-toc-why-flink-on-tez">Why Flink on Tez</a></li> |
| <li><a href="#local-execution" id="markdown-toc-local-execution">Local execution</a></li> |
| <li><a href="#yarn-execution" id="markdown-toc-yarn-execution">YARN execution</a> <ul> |
| <li><a href="#setup" id="markdown-toc-setup">Setup</a></li> |
| <li><a href="#packaging-your-program" id="markdown-toc-packaging-your-program">Packaging your program</a></li> |
| </ul> |
| </li> |
| <li><a href="#how-it-works" id="markdown-toc-how-it-works">How it works</a></li> |
| <li><a href="#limitations" id="markdown-toc-limitations">Limitations</a></li> |
| </ul> |
| |
| <h2 id="why-flink-on-tez">Why Flink on Tez</h2> |
| |
| <p><a href="http://tez.apache.org">Apache Tez</a> is a scalable data processing |
| platform. Tez provides an API for specifying a directed acyclic |
| graph (DAG), and functionality for placing the DAG vertices in YARN |
| containers, as well as data shuffling. In Flink’s architecture, |
| Tez is at about the same level as Flink’s network stack. While Flink’s |
| network stack focuses heavily on low latency in order to support |
| pipelining, data streaming, and iterative algorithms, Tez |
| focuses on scalability and elastic resource usage.</p> |
| |
| <p>Thus, by replacing Flink’s network stack with Tez, users can get scalability |
| and elastic resource usage in shared clusters while retaining Flink’s |
| APIs, optimizer, and runtime algorithms (local sorts, hash tables, etc).</p> |
| |
| <p>Flink programs can run almost unmodified using Tez as an execution |
| environment. Tez supports local execution (e.g., for debugging), and |
| remote execution on YARN.</p> |
| |
| <h2 id="local-execution">Local execution</h2> |
| |
| <p>The <code>LocalTezEnvironment</code> can be used run programs using the local |
| mode provided by Tez. This example shows how WordCount can be run using the Tez local mode. |
| It is identical to a normal Flink WordCount, except that the <code>LocalTezEnvironment</code> is used. |
| To run in local Tez mode, you can simply run a Flink on Tez program |
| from your IDE (e.g., right click and run).</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCountExample</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="kd">final</span> <span class="n">LocalTezEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">LocalTezEnvironment</span><span class="o">.</span><span class="na">create</span><span class="o">();</span> |
| |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span> |
| <span class="s">"Who's there?"</span><span class="o">,</span> |
| <span class="s">"I think I hear them. Stand, ho! Who's there?"</span><span class="o">);</span> |
| |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">text</span> |
| <span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">LineSplitter</span><span class="o">())</span> |
| <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> |
| |
| <span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> |
| |
| <span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">"Word Count Example"</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">LineSplitter</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="o">{</span> |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">line</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> <span class="o">{</span> |
| <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <h2 id="yarn-execution">YARN execution</h2> |
| |
| <h3 id="setup">Setup</h3> |
| |
| <ul> |
| <li> |
| <p>Install Tez on your Hadoop 2 cluster following the instructions from the |
| <a href="http://tez.apache.org/install.html">Apache Tez website</a>. If you are able to run |
| the examples that ship with Tez, then Tez has been successfully installed.</p> |
| </li> |
| <li> |
| <p>Currently, you need to build Flink yourself to obtain Flink on Tez |
| (the reason is a Hadoop version compatibility: Tez releases artifacts |
| on Maven central with a Hadoop 2.6.0 dependency). Build Flink |
| using <code>mvn -DskipTests clean package -Pinclude-tez -Dhadoop.version=X.X.X -Dtez.version=X.X.X</code>. |
| Make sure that the Hadoop version matches the version that Tez uses. |
| Obtain the jar file contained in the Flink distribution under |
| <code>flink-staging/flink-tez/target/flink-tez-x.y.z-flink-fat-jar.jar</code> |
| and upload it to some directory in HDFS. E.g., to upload the file |
| to the directory <code>/apps</code>, execute</p> |
| </li> |
| </ul> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>hadoop fs -put /path/to/flink-tez-x.y.z-flink-fat-jar.jar /apps</code></pre></div> |
| |
| <ul> |
| <li>Edit the tez-site.xml configuration file, adding an entry that points to the |
| location of the file. E.g., assuming that the file is in the directory <code>/apps/</code>, |
| add the following entry to tez-site.xml:</li> |
| </ul> |
| |
| <div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt"><property></span> |
| <span class="nt"><name></span>tez.aux.uris<span class="nt"></name></span> |
| <span class="nt"><value></span>${fs.default.name}/apps/flink-tez-x.y.z-flink-fat-jar.jar<span class="nt"></value></span> |
| <span class="nt"></property></span></code></pre></div> |
| |
| <ul> |
| <li>At this point, you should be able to run the pre-packaged examples, e.g., run WordCount:</li> |
| </ul> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>hadoop jar /path/to/flink-tez-x.y.z-flink-fat-jar.jar wc hdfs:/path/to/text hdfs:/path/to/output</code></pre></div> |
| |
| <h3 id="packaging-your-program">Packaging your program</h3> |
| |
| <p>Application packaging is currently a bit different than in Flink standalone mode. |
| Flink programs that run on Tez need to be packaged in a “fat jar” |
| file that contain the Flink client. This jar can then be executed via the <code>hadoop jar</code> command. |
| An easy way to do that is to use the provided <code>flink-tez-quickstart</code> maven archetype. |
| Create a new project as</p> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>mvn archetype:generate <span class="se">\</span> |
| -DarchetypeGroupId<span class="o">=</span>org.apache.flink <span class="se">\</span> |
| -DarchetypeArtifactId<span class="o">=</span>flink-tez-quickstart <span class="se">\</span> |
| -DarchetypeVersion<span class="o">=</span>0.9.0</code></pre></div> |
| |
| <p>and specify the group id, artifact id, version, and package of your project. For example, |
| let us assume the following options: <code>org.myorganization</code>, <code>flink-on-tez</code>, <code>0.1</code>, and <code>org.myorganization</code>. |
| You should see the following output on your terminal:</p> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>mvn archetype:generate -DarchetypeGroupId<span class="o">=</span>org.apache.flink -DarchetypeArtifactId<span class="o">=</span>flink-tez-quickstart |
| <span class="o">[</span>INFO<span class="o">]</span> Scanning <span class="k">for</span> projects... |
| <span class="o">[</span>INFO<span class="o">]</span> |
| <span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------ |
| <span class="o">[</span>INFO<span class="o">]</span> Building Maven Stub Project <span class="o">(</span>No POM<span class="o">)</span> 1 |
| <span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------ |
| <span class="o">[</span>INFO<span class="o">]</span> |
| <span class="o">[</span>INFO<span class="o">]</span> >>> maven-archetype-plugin:2.2:generate <span class="o">(</span>default-cli<span class="o">)</span> > generate-sources @ standalone-pom >>> |
| <span class="o">[</span>INFO<span class="o">]</span> |
| <span class="o">[</span>INFO<span class="o">]</span> <span class="o"><<<</span> maven-archetype-plugin:2.2:generate <span class="o">(</span>default-cli<span class="o">)</span> < generate-sources @ standalone-pom <span class="o"><<<</span> |
| <span class="o">[</span>INFO<span class="o">]</span> |
| <span class="o">[</span>INFO<span class="o">]</span> --- maven-archetype-plugin:2.2:generate <span class="o">(</span>default-cli<span class="o">)</span> @ standalone-pom --- |
| <span class="o">[</span>INFO<span class="o">]</span> Generating project in Interactive mode |
| <span class="o">[</span>INFO<span class="o">]</span> Archetype <span class="o">[</span>org.apache.flink:flink-tez-quickstart:0.9-SNAPSHOT<span class="o">]</span> found in catalog <span class="nb">local</span> |
| <span class="nb"> </span>Define value <span class="k">for</span> property <span class="s1">'groupId'</span>: : org.myorganization |
| Define value <span class="k">for</span> property <span class="s1">'artifactId'</span>: : flink-on-tez |
| Define value <span class="k">for</span> property <span class="s1">'version'</span>: 1.0-SNAPSHOT: : 0.1 |
| Define value <span class="k">for</span> property <span class="s1">'package'</span>: org.myorganization: : |
| Confirm properties configuration: |
| groupId: org.myorganization |
| artifactId: flink-on-tez |
| version: 0.1 |
| package: org.myorganization |
| Y: : Y |
| <span class="o">[</span>INFO<span class="o">]</span> ---------------------------------------------------------------------------- |
| <span class="o">[</span>INFO<span class="o">]</span> Using following parameters <span class="k">for</span> creating project from Archetype: flink-tez-quickstart:0.9-SNAPSHOT |
| <span class="o">[</span>INFO<span class="o">]</span> ---------------------------------------------------------------------------- |
| <span class="o">[</span>INFO<span class="o">]</span> Parameter: groupId, Value: org.myorganization |
| <span class="o">[</span>INFO<span class="o">]</span> Parameter: artifactId, Value: flink-on-tez |
| <span class="o">[</span>INFO<span class="o">]</span> Parameter: version, Value: 0.1 |
| <span class="o">[</span>INFO<span class="o">]</span> Parameter: package, Value: org.myorganization |
| <span class="o">[</span>INFO<span class="o">]</span> Parameter: packageInPathFormat, Value: org/myorganization |
| <span class="o">[</span>INFO<span class="o">]</span> Parameter: package, Value: org.myorganization |
| <span class="o">[</span>INFO<span class="o">]</span> Parameter: version, Value: 0.1 |
| <span class="o">[</span>INFO<span class="o">]</span> Parameter: groupId, Value: org.myorganization |
| <span class="o">[</span>INFO<span class="o">]</span> Parameter: artifactId, Value: flink-on-tez |
| <span class="o">[</span>INFO<span class="o">]</span> project created from Archetype in dir: /Users/kostas/Dropbox/flink-tez-quickstart-test/flink-on-tez |
| <span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------ |
| <span class="o">[</span>INFO<span class="o">]</span> BUILD SUCCESS |
| <span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------ |
| <span class="o">[</span>INFO<span class="o">]</span> Total <span class="nb">time</span>: 44.130 s |
| <span class="o">[</span>INFO<span class="o">]</span> Finished at: 2015-02-26T17:59:45+01:00 |
| <span class="o">[</span>INFO<span class="o">]</span> Final Memory: 15M/309M |
| <span class="o">[</span>INFO<span class="o">]</span> ------------------------------------------------------------------------</code></pre></div> |
| |
| <p>The project contains an example called <code>YarnJob.java</code> that provides the skeleton |
| for a Flink-on-Tez job. Program execution is currently done using Hadoop’s <code>ProgramDriver</code>, |
| see the <code>Driver.java</code> class for an example. Create the fat jar using |
| <code>mvn -DskipTests clean package</code>. The resulting jar will be located in the <code>target/</code> directory. |
| You can now execute a job as follows:</p> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>mvn -DskipTests clean package |
| <span class="nv">$ </span>hadoop jar flink-on-tez/target/flink-on-tez-0.1-flink-fat-jar.jar yarnjob <span class="o">[</span><span class="nb">command</span>-line parameters<span class="o">]</span></code></pre></div> |
| |
| <p>Flink programs that run on YARN using Tez as an execution engine need to use the <code>RemoteTezEnvironment</code> and |
| register the class that contains the <code>main</code> method with that environment:</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCountExample</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="kd">final</span> <span class="n">RemoteTezEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">RemoteTezEnvironment</span><span class="o">.</span><span class="na">create</span><span class="o">();</span> |
| |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span> |
| <span class="s">"Who's there?"</span><span class="o">,</span> |
| <span class="s">"I think I hear them. Stand, ho! Who's there?"</span><span class="o">);</span> |
| |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">text</span> |
| <span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">LineSplitter</span><span class="o">())</span> |
| <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> |
| |
| <span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> |
| |
| <span class="n">env</span><span class="o">.</span><span class="na">registerMainClass</span><span class="o">(</span><span class="n">WordCountExample</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| <span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">"Word Count Example"</span><span class="o">);</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">LineSplitter</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="o">{</span> |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">line</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> <span class="o">{</span> |
| <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <h2 id="how-it-works">How it works</h2> |
| |
| <p>Flink on Tez reuses the Flink APIs, the Flink optimizer, |
| and the Flink local runtime, including Flink’s hash table and sort implementations. Tez |
| replaces Flink’s network stack and control plan, and is responsible for scheduling and |
| network shuffles.</p> |
| |
| <p>The figure below shows how a Flink program passes through the Flink stack and generates |
| a Tez DAG (instead of a JobGraph that would be created using normal Flink execution).</p> |
| |
| <div style="text-align: center;"> |
| <img src="fig/flink_on_tez_translation.png" alt="Translation of a Flink program to a Tez DAG." height="600px" vspace="20px" style="text-align: center;" /> |
| </div> |
| |
| <p>All local processing, including memory management, sorting, and hashing is performed by |
| Flink as usual. Local processing is encapsulated in Tez vertices, as seen in the figure |
| below. Tez vertices are connected by edges. Tez is currently based on a key-value data |
| model. In the current implementation, the elements that are processed by Flink operators |
| are wrapped inside Tez values, and the Tez key field is used to indicate the index of the target task |
| that the elements are destined to.</p> |
| |
| <div style="text-align: center;"> |
| <img src="fig/flink_tez_vertex.png" alt="Encapsulation of Flink runtime inside Tez vertices." height="200px" vspace="20px" style="text-align: center;" /> |
| </div> |
| |
| <h2 id="limitations">Limitations</h2> |
| |
| <p>Currently, Flink on Tez does not support all features of the Flink API. We are working |
| to enable all of the missing features listed below. In the meantime, if your project depends on these features, we suggest |
| to use <a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">Flink on YARN</a> or <a href="http://flink.apache.org/docs/0.9/quickstart/setup_quickstart.html">Flink standalone</a>.</p> |
| |
| <p>The following features are currently missing.</p> |
| |
| <ul> |
| <li> |
| <p>Dedicated client: jobs need to be submitted via Hadoop’s command-line client</p> |
| </li> |
| <li> |
| <p>Self-joins: currently binary operators that receive the same input are not supported due to |
| <a href="https://issues.apache.org/jira/browse/TEZ-1190">TEZ-1190</a>.</p> |
| </li> |
| <li> |
| <p>Iterative programs are currently not supported.</p> |
| </li> |
| <li> |
| <p>Broadcast variables are currently not supported.</p> |
| </li> |
| <li> |
| <p>Accummulators and counters are currently not supported.</p> |
| </li> |
| <li> |
| <p>Performance: The current implementation has not been heavily tested for performance, and misses several optimizations, |
| including task chaining.</p> |
| </li> |
| <li> |
| <p>Streaming API: Streaming programs will not currently compile to Tez DAGs.</p> |
| </li> |
| <li> |
| <p>Scala API: The current implementation has only been tested with the Java API.</p> |
| </li> |
| </ul> |
| |
| |
| </div> |
| |
| <div class="col-sm-10 col-sm-offset-1"> |
| <!-- Disqus thread and some vertical offset --> |
| <div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div> |
| </div> |
| </div> |
| |
| </div><!-- /.container --> |
| |
| <!-- jQuery (necessary for Bootstrap's JavaScript plugins) --> |
| <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script> |
| <!-- Include all compiled plugins (below), or include individual files as needed --> |
| <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script> |
| <script src="http://flink.apache.org/docs/0.9/page/js/codetabs.js"></script> |
| |
| <!-- Google Analytics --> |
| <script> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-52545728-1', 'auto'); |
| ga('send', 'pageview'); |
| </script> |
| |
| <!-- Disqus --> |
| <script type="text/javascript"> |
| var disqus_shortname = 'stratosphere-eu'; |
| (function() { |
| var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true; |
| dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js'; |
| (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq); |
| })(); |
| </script> |
| </body> |
| </html> |