| --- |
| title: "Running Flink on YARN leveraging Tez" |
| --- |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| <a href="#top"></a> |
| |
| You can run Flink using Tez as an execution environment. Flink on Tez |
| is currently included in *flink-staging* in alpha. All classes are |
| located in the *org.apache.flink.tez* package. |
| |
| * This will be replaced by the TOC |
| {:toc} |
| |
| ## Why Flink on Tez |
| |
| [Apache Tez](http://tez.apache.org) is a scalable data processing |
| platform. Tez provides an API for specifying a directed acyclic |
| graph (DAG), and functionality for placing the DAG vertices in YARN |
| containers, as well as data shuffling. In Flink's architecture, |
| Tez is at about the same level as Flink's network stack. While Flink's |
| network stack focuses heavily on low latency in order to support |
| pipelining, data streaming, and iterative algorithms, Tez |
| focuses on scalability and elastic resource usage. |
| |
| Thus, by replacing Flink's network stack with Tez, users can get scalability |
| and elastic resource usage in shared clusters while retaining Flink's |
| APIs, optimizer, and runtime algorithms (local sorts, hash tables, etc). |
| |
| Flink programs can run almost unmodified using Tez as an execution |
| environment. Tez supports local execution (e.g., for debugging), and |
| remote execution on YARN. |
| |
| |
| ## Local execution |
| |
| The `LocalTezEnvironment` can be used run programs using the local |
| mode provided by Tez. This example shows how WordCount can be run using the Tez local mode. |
| It is identical to a normal Flink WordCount, except that the `LocalTezEnvironment` is used. |
| To run in local Tez mode, you can simply run a Flink on Tez program |
| from your IDE (e.g., right click and run). |
| |
| {% highlight java %} |
| public class WordCountExample { |
| public static void main(String[] args) throws Exception { |
| final LocalTezEnvironment env = LocalTezEnvironment.create(); |
| |
| DataSet<String> text = env.fromElements( |
| "Who's there?", |
| "I think I hear them. Stand, ho! Who's there?"); |
| |
| DataSet<Tuple2<String, Integer>> wordCounts = text |
| .flatMap(new LineSplitter()) |
| .groupBy(0) |
| .sum(1); |
| |
| wordCounts.print(); |
| |
| env.execute("Word Count Example"); |
| } |
| |
| public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { |
| @Override |
| public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { |
| for (String word : line.split(" ")) { |
| out.collect(new Tuple2<String, Integer>(word, 1)); |
| } |
| } |
| } |
| } |
| {% endhighlight %} |
| |
| ## YARN execution |
| |
| ### Setup |
| |
| - Install Tez on your Hadoop 2 cluster following the instructions from the |
| [Apache Tez website](http://tez.apache.org/install.html). If you are able to run |
| the examples that ship with Tez, then Tez has been successfully installed. |
| |
| - Currently, you need to build Flink yourself to obtain Flink on Tez |
| (the reason is a Hadoop version compatibility: Tez releases artifacts |
| on Maven central with a Hadoop 2.6.0 dependency). Build Flink |
| using `mvn -DskipTests clean package -Pinclude-tez -Dhadoop.version=X.X.X -Dtez.version=X.X.X`. |
| Make sure that the Hadoop version matches the version that Tez uses. |
| Obtain the jar file contained in the Flink distribution under |
| `flink-staging/flink-tez/target/flink-tez-x.y.z-flink-fat-jar.jar` |
| and upload it to some directory in HDFS. E.g., to upload the file |
| to the directory `/apps`, execute |
| {% highlight bash %} |
| $ hadoop fs -put /path/to/flink-tez-x.y.z-flink-fat-jar.jar /apps |
| {% endhighlight %} |
| |
| - Edit the tez-site.xml configuration file, adding an entry that points to the |
| location of the file. E.g., assuming that the file is in the directory `/apps/`, |
| add the following entry to tez-site.xml: |
| {% highlight xml %} |
| <property> |
| <name>tez.aux.uris</name> |
| <value>${fs.default.name}/apps/flink-tez-x.y.z-flink-fat-jar.jar</value> |
| </property> |
| {% endhighlight %} |
| |
| - At this point, you should be able to run the pre-packaged examples, e.g., run WordCount: |
| {% highlight bash %} |
| $ hadoop jar /path/to/flink-tez-x.y.z-flink-fat-jar.jar wc hdfs:/path/to/text hdfs:/path/to/output |
| {% endhighlight %} |
| |
| |
| ### Packaging your program |
| |
| Application packaging is currently a bit different than in Flink standalone mode. |
| Flink programs that run on Tez need to be packaged in a "fat jar" |
| file that contain the Flink client. This jar can then be executed via the `hadoop jar` command. |
| An easy way to do that is to use the provided `flink-tez-quickstart` maven archetype. |
| Create a new project as |
| |
| {% highlight bash %} |
| $ mvn archetype:generate \ |
| -DarchetypeGroupId=org.apache.flink \ |
| -DarchetypeArtifactId=flink-tez-quickstart \ |
| -DarchetypeVersion={{site.version}} |
| {% endhighlight %} |
| |
| and specify the group id, artifact id, version, and package of your project. For example, |
| let us assume the following options: `org.myorganization`, `flink-on-tez`, `0.1`, and `org.myorganization`. |
| You should see the following output on your terminal: |
| |
| {% highlight bash %} |
| $ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-tez-quickstart |
| [INFO] Scanning for projects... |
| [INFO] |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] Building Maven Stub Project (No POM) 1 |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] |
| [INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) > generate-sources @ standalone-pom >>> |
| [INFO] |
| [INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) < generate-sources @ standalone-pom <<< |
| [INFO] |
| [INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom --- |
| [INFO] Generating project in Interactive mode |
| [INFO] Archetype [org.apache.flink:flink-tez-quickstart:0.9-SNAPSHOT] found in catalog local |
| Define value for property 'groupId': : org.myorganization |
| Define value for property 'artifactId': : flink-on-tez |
| Define value for property 'version': 1.0-SNAPSHOT: : 0.1 |
| Define value for property 'package': org.myorganization: : |
| Confirm properties configuration: |
| groupId: org.myorganization |
| artifactId: flink-on-tez |
| version: 0.1 |
| package: org.myorganization |
| Y: : Y |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Using following parameters for creating project from Archetype: flink-tez-quickstart:0.9-SNAPSHOT |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Parameter: groupId, Value: org.myorganization |
| [INFO] Parameter: artifactId, Value: flink-on-tez |
| [INFO] Parameter: version, Value: 0.1 |
| [INFO] Parameter: package, Value: org.myorganization |
| [INFO] Parameter: packageInPathFormat, Value: org/myorganization |
| [INFO] Parameter: package, Value: org.myorganization |
| [INFO] Parameter: version, Value: 0.1 |
| [INFO] Parameter: groupId, Value: org.myorganization |
| [INFO] Parameter: artifactId, Value: flink-on-tez |
| [INFO] project created from Archetype in dir: /Users/kostas/Dropbox/flink-tez-quickstart-test/flink-on-tez |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] BUILD SUCCESS |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] Total time: 44.130 s |
| [INFO] Finished at: 2015-02-26T17:59:45+01:00 |
| [INFO] Final Memory: 15M/309M |
| [INFO] ------------------------------------------------------------------------ |
| {% endhighlight %} |
| |
| The project contains an example called `YarnJob.java` that provides the skeleton |
| for a Flink-on-Tez job. Program execution is currently done using Hadoop's `ProgramDriver`, |
| see the `Driver.java` class for an example. Create the fat jar using |
| `mvn -DskipTests clean package`. The resulting jar will be located in the `target/` directory. |
| You can now execute a job as follows: |
| |
| {% highlight bash %} |
| $ mvn -DskipTests clean package |
| $ hadoop jar flink-on-tez/target/flink-on-tez-0.1-flink-fat-jar.jar yarnjob [command-line parameters] |
| {% endhighlight %} |
| |
| Flink programs that run on YARN using Tez as an execution engine need to use the `RemoteTezEnvironment` and |
| register the class that contains the `main` method with that environment: |
| {% highlight java %} |
| public class WordCountExample { |
| public static void main(String[] args) throws Exception { |
| final RemoteTezEnvironment env = RemoteTezEnvironment.create(); |
| |
| DataSet<String> text = env.fromElements( |
| "Who's there?", |
| "I think I hear them. Stand, ho! Who's there?"); |
| |
| DataSet<Tuple2<String, Integer>> wordCounts = text |
| .flatMap(new LineSplitter()) |
| .groupBy(0) |
| .sum(1); |
| |
| wordCounts.print(); |
| |
| env.registerMainClass(WordCountExample.class); |
| env.execute("Word Count Example"); |
| } |
| |
| public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { |
| @Override |
| public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { |
| for (String word : line.split(" ")) { |
| out.collect(new Tuple2<String, Integer>(word, 1)); |
| } |
| } |
| } |
| } |
| {% endhighlight %} |
| |
| |
| ## How it works |
| |
| Flink on Tez reuses the Flink APIs, the Flink optimizer, |
| and the Flink local runtime, including Flink's hash table and sort implementations. Tez |
| replaces Flink's network stack and control plan, and is responsible for scheduling and |
| network shuffles. |
| |
| The figure below shows how a Flink program passes through the Flink stack and generates |
| a Tez DAG (instead of a JobGraph that would be created using normal Flink execution). |
| |
| <div style="text-align: center;"> |
| <img src="fig/flink_on_tez_translation.png" alt="Translation of a Flink program to a Tez DAG." height="600px" vspace="20px" style="text-align: center;"/> |
| </div> |
| |
| All local processing, including memory management, sorting, and hashing is performed by |
| Flink as usual. Local processing is encapsulated in Tez vertices, as seen in the figure |
| below. Tez vertices are connected by edges. Tez is currently based on a key-value data |
| model. In the current implementation, the elements that are processed by Flink operators |
| are wrapped inside Tez values, and the Tez key field is used to indicate the index of the target task |
| that the elements are destined to. |
| |
| <div style="text-align: center;"> |
| <img src="fig/flink_tez_vertex.png" alt="Encapsulation of Flink runtime inside Tez vertices." height="200px" vspace="20px" style="text-align: center;"/> |
| </div> |
| |
| ## Limitations |
| |
| Currently, Flink on Tez does not support all features of the Flink API. We are working |
| to enable all of the missing features listed below. In the meantime, if your project depends on these features, we suggest |
| to use [Flink on YARN]({{site.baseurl}}/setup/yarn_setup.html) or [Flink standalone]({{site.baseurl}}/quickstart/setup_quickstart.html). |
| |
| The following features are currently missing. |
| |
| - Dedicated client: jobs need to be submitted via Hadoop's command-line client |
| |
| - Self-joins: currently binary operators that receive the same input are not supported due to |
| [TEZ-1190](https://issues.apache.org/jira/browse/TEZ-1190). |
| |
| - Iterative programs are currently not supported. |
| |
| - Broadcast variables are currently not supported. |
| |
| - Accummulators and counters are currently not supported. |
| |
| - Performance: The current implementation has not been heavily tested for performance, and misses several optimizations, |
| including task chaining. |
| |
| - Streaming API: Streaming programs will not currently compile to Tez DAGs. |
| |
| - Scala API: The current implementation has only been tested with the Java API. |
| |
| |
| |