| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| <!DOCTYPE html> |
| |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> |
| |
| <title>Apache Flink 0.9.0 Documentation: Hadoop Compatibility</title> |
| |
| <link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon"> |
| |
| <!-- Bootstrap --> |
| <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css"> |
| <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css"> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| <body> |
| |
| |
| |
| |
| |
| |
| <!-- Top navbar. --> |
| <nav class="navbar navbar-default navbar-fixed-top"> |
| <div class="container"> |
| <!-- The logo. --> |
| <div class="navbar-header"> |
| <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <div class="navbar-logo"> |
| <a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a> |
| </div> |
| </div><!-- /.navbar-header --> |
| |
| <!-- The navigation links. --> |
| <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> |
| <ul class="nav navbar-nav"> |
| <li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li> |
| |
| <!-- Setup --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li> |
| |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Deployment</strong></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li> |
| |
| <li class="divider"></li> |
| <li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li> |
| </ul> |
| </li> |
| |
| <!-- Programming Guides --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li> |
| |
| <li class="divider"></li> |
| <li><a href="scala_shell.html">Interactive Scala Shell</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li> |
| </ul> |
| </li> |
| |
| <!-- Libraries --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li> |
| </ul> |
| </li> |
| |
| <!-- Internals --> |
| <li class="dropdown"> |
| <a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li> |
| <li class="divider"></li> |
| <li role="presentation" class="dropdown-header"><strong>Internals</strong></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture & Process Model</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction & Serialization</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs & Scheduling</a></li> |
| <li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li> |
| </ul> |
| </li> |
| </ul> |
| <form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html"> |
| <div class="form-group"> |
| <input type="text" class="form-control" name="q" placeholder="Search all pages"> |
| </div> |
| <button type="submit" class="btn btn-default">Search</button> |
| </form> |
| </div><!-- /.navbar-collapse --> |
| </div><!-- /.container --> |
| </nav> |
| |
| |
| |
| |
| <!-- Main content. --> |
| <div class="container"> |
| |
| |
| <div class="row"> |
| <div class="col-sm-10 col-sm-offset-1"> |
| <h1>Hadoop Compatibility <span class="badge">Beta</span></h1> |
| |
| |
| |
| <p>Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows |
| reusing code that was implemented for Hadoop MapReduce.</p> |
| |
| <p>You can:</p> |
| |
| <ul> |
| <li>use Hadoop’s <code>Writable</code> <a href="programming_guide.html#data-types">data types</a> in Flink programs.</li> |
| <li>use any Hadoop <code>InputFormat</code> as a <a href="programming_guide.html#data-sources">DataSource</a>.</li> |
| <li>use any Hadoop <code>OutputFormat</code> as a <a href="programming_guide.html#data-sinks">DataSink</a>.</li> |
| <li>use a Hadoop <code>Mapper</code> as <a href="dataset_transformations.html#flatmap">FlatMapFunction</a>.</li> |
| <li>use a Hadoop <code>Reducer</code> as <a href="dataset_transformations.html#groupreduce-on-grouped-dataset">GroupReduceFunction</a>.</li> |
| </ul> |
| |
| <p>This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the |
| <a href="example_connectors.html">Connecting to other systems</a> guide for reading from Hadoop supported file systems.</p> |
| |
| <ul id="markdown-toc"> |
| <li><a href="#project-configuration" id="markdown-toc-project-configuration">Project Configuration</a></li> |
| <li><a href="#using-hadoop-data-types" id="markdown-toc-using-hadoop-data-types">Using Hadoop Data Types</a></li> |
| <li><a href="#using-hadoop-inputformats" id="markdown-toc-using-hadoop-inputformats">Using Hadoop InputFormats</a></li> |
| <li><a href="#using-hadoop-outputformats" id="markdown-toc-using-hadoop-outputformats">Using Hadoop OutputFormats</a></li> |
| <li><a href="#using-hadoop-mappers-and-reducers" id="markdown-toc-using-hadoop-mappers-and-reducers">Using Hadoop Mappers and Reducers</a></li> |
| <li><a href="#complete-hadoop-wordcount-example" id="markdown-toc-complete-hadoop-wordcount-example">Complete Hadoop WordCount Example</a></li> |
| </ul> |
| |
| <h3 id="project-configuration">Project Configuration</h3> |
| |
| <p>Support for Haddop input/output formats is part of the <code>flink-java</code> and |
| <code>flink-scala</code> Maven modules that are always required when writing Flink jobs. |
| The code is located in <code>org.apache.flink.api.java.hadoop</code> and |
| <code>org.apache.flink.api.scala.hadoop</code> in an additional sub-package for the |
| <code>mapred</code> and <code>mapreduce</code> API.</p> |
| |
| <p>Support for Hadoop Mappers and Reducers is contained in the <code>flink-staging</code> |
| Maven module. |
| This code resides in the <code>org.apache.flink.hadoopcompatibility</code> |
| package.</p> |
| |
| <p>Add the following dependency to your <code>pom.xml</code> if you want to reuse Mappers |
| and Reducers.</p> |
| |
| <div class="highlight"><pre><code class="language-xml"><span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.flink<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>flink-hadoop-compatibility<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>0.9.0<span class="nt"></version></span> |
| <span class="nt"></dependency></span></code></pre></div> |
| |
| <h3 id="using-hadoop-data-types">Using Hadoop Data Types</h3> |
| |
| <p>Flink supports all Hadoop <code>Writable</code> and <code>WritableComparable</code> data types |
| out-of-the-box. You do not need to include the Hadoop Compatibility dependency, |
| if you only want to use your Hadoop data types. See the |
| <a href="programming_guide.html#data-types">Programming Guide</a> for more details.</p> |
| |
| <h3 id="using-hadoop-inputformats">Using Hadoop InputFormats</h3> |
| |
| <p>Hadoop input formats can be used to create a data source by using |
| one of the methods <code>readHadoopFile</code> or <code>createHadoopInput</code> of the |
| <code>ExecutionEnvironment</code>. The former is used for input formats derived |
| from <code>FileInputFormat</code> while the latter has to be used for general purpose |
| input formats.</p> |
| |
| <p>The resulting <code>DataSet</code> contains 2-tuples where the first field |
| is the key and the second field is the value retrieved from the Hadoop |
| InputFormat.</p> |
| |
| <p>The following example shows how to use Hadoop’s <code>TextInputFormat</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span> |
| |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">>></span> <span class="n">input</span> <span class="o">=</span> |
| <span class="n">env</span><span class="o">.</span><span class="na">readHadoopFile</span><span class="o">(</span><span class="k">new</span> <span class="nf">TextInputFormat</span><span class="o">(),</span> <span class="n">LongWritable</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">textPath</span><span class="o">);</span> |
| |
| <span class="c1">// Do something with the data.</span> |
| <span class="o">[...]</span></code></pre></div> |
| |
| </div> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span> |
| |
| <span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">LongWritable</span>, <span class="kt">Text</span><span class="o">)]</span> <span class="k">=</span> |
| <span class="n">env</span><span class="o">.</span><span class="n">readHadoopFile</span><span class="o">(</span><span class="k">new</span> <span class="nc">TextInputFormat</span><span class="o">,</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">LongWritable</span><span class="o">],</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">Text</span><span class="o">],</span> <span class="n">textPath</span><span class="o">)</span> |
| |
| <span class="c1">// Do something with the data.</span> |
| <span class="o">[</span><span class="kt">...</span><span class="o">]</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="using-hadoop-outputformats">Using Hadoop OutputFormats</h3> |
| |
| <p>Flink provides a compatibility wrapper for Hadoop <code>OutputFormats</code>. Any class |
| that implements <code>org.apache.hadoop.mapred.OutputFormat</code> or extends |
| <code>org.apache.hadoop.mapreduce.OutputFormat</code> is supported. |
| The OutputFormat wrapper expects its input data to be a DataSet containing |
| 2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.</p> |
| |
| <p>The following example shows how to use Hadoop’s <code>TextOutputFormat</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="c1">// Obtain the result we want to emit</span> |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>></span> <span class="n">hadoopResult</span> <span class="o">=</span> <span class="o">[...]</span> |
| |
| <span class="c1">// Set up the Hadoop TextOutputFormat.</span> |
| <span class="n">HadoopOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">></span> <span class="n">hadoopOF</span> <span class="o">=</span> |
| <span class="c1">// create the Flink wrapper.</span> |
| <span class="k">new</span> <span class="n">HadoopOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>(</span> |
| <span class="c1">// set the Hadoop OutputFormat and specify the job.</span> |
| <span class="k">new</span> <span class="n">TextOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>(),</span> <span class="n">job</span> |
| <span class="o">);</span> |
| <span class="n">hadoopOF</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="s">"mapreduce.output.textoutputformat.separator"</span><span class="o">,</span> <span class="s">" "</span><span class="o">);</span> |
| <span class="n">TextOutputFormat</span><span class="o">.</span><span class="na">setOutputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">outputPath</span><span class="o">));</span> |
| |
| <span class="c1">// Emit data using the Hadoop TextOutputFormat.</span> |
| <span class="n">hadoopResult</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala"><span class="c1">// Obtain your result to emit.</span> |
| <span class="k">val</span> <span class="n">hadoopResult</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Text</span>, <span class="kt">IntWritable</span><span class="o">)]</span> <span class="k">=</span> <span class="o">[</span><span class="kt">...</span><span class="o">]</span> |
| |
| <span class="k">val</span> <span class="n">hadoopOF</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">HadoopOutputFormat</span><span class="o">[</span><span class="kt">Text</span>,<span class="kt">IntWritable</span><span class="o">](</span> |
| <span class="k">new</span> <span class="nc">TextOutputFormat</span><span class="o">[</span><span class="kt">Text</span>, <span class="kt">IntWritable</span><span class="o">],</span> |
| <span class="k">new</span> <span class="nc">JobConf</span><span class="o">)</span> |
| |
| <span class="n">hadoopOF</span><span class="o">.</span><span class="n">getJobConf</span><span class="o">.</span><span class="n">set</span><span class="o">(</span><span class="s">"mapred.textoutputformat.separator"</span><span class="o">,</span> <span class="s">" "</span><span class="o">)</span> |
| <span class="nc">FileOutputFormat</span><span class="o">.</span><span class="n">setOutputPath</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">.</span><span class="n">getJobConf</span><span class="o">,</span> <span class="k">new</span> <span class="nc">Path</span><span class="o">(</span><span class="n">resultPath</span><span class="o">))</span> |
| |
| <span class="n">hadoopResult</span><span class="o">.</span><span class="n">output</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="using-hadoop-mappers-and-reducers">Using Hadoop Mappers and Reducers</h3> |
| |
| <p>Hadoop Mappers are semantically equivalent to Flink’s <a href="dataset_transformations.html#flatmap">FlatMapFunctions</a> and Hadoop Reducers are equivalent to Flink’s <a href="dataset_transformations.html#groupreduce-on-grouped-dataset">GroupReduceFunctions</a>. Flink provides wrappers for implementations of Hadoop MapReduce’s <code>Mapper</code> and <code>Reducer</code> interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop’s mapred API (<code>org.apache.hadoop.mapred</code>) are supported.</p> |
| |
| <p>The wrappers take a <code>DataSet<Tuple2<KEYIN,VALUEIN>></code> as input and produce a <code>DataSet<Tuple2<KEYOUT,VALUEOUT>></code> as output where <code>KEYIN</code> and <code>KEYOUT</code> are the keys and <code>VALUEIN</code> and <code>VALUEOUT</code> are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (<code>HadoopReduceCombineFunction</code>) and without a Combiner (<code>HadoopReduceFunction</code>). The wrappers accept an optional <code>JobConf</code> object to configure the Hadoop Mapper or Reducer.</p> |
| |
| <p>Flink’s function wrappers are</p> |
| |
| <ul> |
| <li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction</code>,</li> |
| <li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction</code>, and</li> |
| <li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction</code>.</li> |
| </ul> |
| |
| <p>and can be used as regular Flink <a href="dataset_transformations.html#flatmap">FlatMapFunctions</a> or <a href="dataset_transformations.html#groupreduce-on-grouped-dataset">GroupReduceFunctions</a>.</p> |
| |
| <p>The following example shows how to use Hadoop <code>Mapper</code> and <code>Reducer</code> functions.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="c1">// Obtain data to process somehow.</span> |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>></span> <span class="n">text</span> <span class="o">=</span> <span class="o">[...]</span> |
| |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>></span> <span class="n">result</span> <span class="o">=</span> <span class="n">text</span> |
| <span class="c1">// use Hadoop Mapper (Tokenizer) as MapFunction</span> |
| <span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopMapFunction</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>(</span> |
| <span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">()</span> |
| <span class="o">))</span> |
| <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> |
| <span class="c1">// use Hadoop Reducer (Counter) as Reduce- and CombineFunction</span> |
| <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopReduceCombineFunction</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>(</span> |
| <span class="k">new</span> <span class="nf">Counter</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Counter</span><span class="o">()</span> |
| <span class="o">));</span></code></pre></div> |
| |
| <p><strong>Please note:</strong> The Reducer wrapper works on groups as defined by Flink’s <a href="dataset_transformations.html#transformations-on-grouped-dataset">groupBy()</a> operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the <code>JobConf</code>.</p> |
| |
| <h3 id="complete-hadoop-wordcount-example">Complete Hadoop WordCount Example</h3> |
| |
| <p>The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span> |
| |
| <span class="c1">// Set up the Hadoop TextInputFormat.</span> |
| <span class="n">Job</span> <span class="n">job</span> <span class="o">=</span> <span class="n">Job</span><span class="o">.</span><span class="na">getInstance</span><span class="o">();</span> |
| <span class="n">HadoopInputFormat</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">></span> <span class="n">hadoopIF</span> <span class="o">=</span> |
| <span class="k">new</span> <span class="n">HadoopInputFormat</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">>(</span> |
| <span class="k">new</span> <span class="nf">TextInputFormat</span><span class="o">(),</span> <span class="n">LongWritable</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">job</span> |
| <span class="o">);</span> |
| <span class="n">TextInputFormat</span><span class="o">.</span><span class="na">addInputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">inputPath</span><span class="o">));</span> |
| |
| <span class="c1">// Read data using the Hadoop TextInputFormat.</span> |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">>></span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span><span class="n">hadoopIF</span><span class="o">);</span> |
| |
| <span class="n">DataSet</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>></span> <span class="n">result</span> <span class="o">=</span> <span class="n">text</span> |
| <span class="c1">// use Hadoop Mapper (Tokenizer) as MapFunction</span> |
| <span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopMapFunction</span><span class="o"><</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>(</span> |
| <span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">()</span> |
| <span class="o">))</span> |
| <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> |
| <span class="c1">// use Hadoop Reducer (Counter) as Reduce- and CombineFunction</span> |
| <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopReduceCombineFunction</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">>(</span> |
| <span class="k">new</span> <span class="nf">Counter</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Counter</span><span class="o">()</span> |
| <span class="o">));</span> |
| |
| <span class="c1">// Set up the Hadoop TextOutputFormat.</span> |
| <span class="n">HadoopOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">></span> <span class="n">hadoopOF</span> <span class="o">=</span> |
| <span class="k">new</span> <span class="n">HadoopOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>(</span> |
| <span class="k">new</span> <span class="n">TextOutputFormat</span><span class="o"><</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">>(),</span> <span class="n">job</span> |
| <span class="o">);</span> |
| <span class="n">hadoopOF</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="s">"mapreduce.output.textoutputformat.separator"</span><span class="o">,</span> <span class="s">" "</span><span class="o">);</span> |
| <span class="n">TextOutputFormat</span><span class="o">.</span><span class="na">setOutputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">outputPath</span><span class="o">));</span> |
| |
| <span class="c1">// Emit data using the Hadoop TextOutputFormat.</span> |
| <span class="n">result</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">);</span> |
| |
| <span class="c1">// Execute Program</span> |
| <span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">"Hadoop WordCount"</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div class="col-sm-10 col-sm-offset-1"> |
| <!-- Disqus thread and some vertical offset --> |
| <div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div> |
| </div> |
| </div> |
| |
| </div><!-- /.container --> |
| |
| <!-- jQuery (necessary for Bootstrap's JavaScript plugins) --> |
| <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script> |
| <!-- Include all compiled plugins (below), or include individual files as needed --> |
| <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script> |
| <script src="http://flink.apache.org/docs/0.9/page/js/codetabs.js"></script> |
| |
| <!-- Google Analytics --> |
| <script> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-52545728-1', 'auto'); |
| ga('send', 'pageview'); |
| </script> |
| |
| <!-- Disqus --> |
| <script type="text/javascript"> |
| var disqus_shortname = 'stratosphere-eu'; |
| (function() { |
| var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true; |
| dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js'; |
| (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq); |
| })(); |
| </script> |
| </body> |
| </html> |