blob: cbbafe79eb50d3bf9fc448208bc0d5b1339789b9 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink 0.9.0 Documentation: Hadoop Compatibility</title>
<link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css">
<link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css">
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Top navbar. -->
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li>
<!-- Setup -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li>
</ul>
</li>
<!-- Programming Guides -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li>
<li class="divider"></li>
<li><a href="scala_shell.html">Interactive Scala Shell</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li>
<li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Libraries -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
<li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
</ul>
</li>
<!-- Internals -->
<li class="dropdown">
<a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li>
<li class="divider"></li>
<li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture &amp; Process Model</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
<li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li>
</ul>
</li>
</ul>
<form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html">
<div class="form-group">
<input type="text" class="form-control" name="q" placeholder="Search all pages">
</div>
<button type="submit" class="btn btn-default">Search</button>
</form>
</div><!-- /.navbar-collapse -->
</div><!-- /.container -->
</nav>
<!-- Main content. -->
<div class="container">
<div class="row">
<div class="col-sm-10 col-sm-offset-1">
<h1>Hadoop Compatibility <span class="badge">Beta</span></h1>
<p>Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows
reusing code that was implemented for Hadoop MapReduce.</p>
<p>You can:</p>
<ul>
<li>use Hadoop’s <code>Writable</code> <a href="programming_guide.html#data-types">data types</a> in Flink programs.</li>
<li>use any Hadoop <code>InputFormat</code> as a <a href="programming_guide.html#data-sources">DataSource</a>.</li>
<li>use any Hadoop <code>OutputFormat</code> as a <a href="programming_guide.html#data-sinks">DataSink</a>.</li>
<li>use a Hadoop <code>Mapper</code> as <a href="dataset_transformations.html#flatmap">FlatMapFunction</a>.</li>
<li>use a Hadoop <code>Reducer</code> as <a href="dataset_transformations.html#groupreduce-on-grouped-dataset">GroupReduceFunction</a>.</li>
</ul>
<p>This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the
<a href="example_connectors.html">Connecting to other systems</a> guide for reading from Hadoop supported file systems.</p>
<ul id="markdown-toc">
<li><a href="#project-configuration" id="markdown-toc-project-configuration">Project Configuration</a></li>
<li><a href="#using-hadoop-data-types" id="markdown-toc-using-hadoop-data-types">Using Hadoop Data Types</a></li>
<li><a href="#using-hadoop-inputformats" id="markdown-toc-using-hadoop-inputformats">Using Hadoop InputFormats</a></li>
<li><a href="#using-hadoop-outputformats" id="markdown-toc-using-hadoop-outputformats">Using Hadoop OutputFormats</a></li>
<li><a href="#using-hadoop-mappers-and-reducers" id="markdown-toc-using-hadoop-mappers-and-reducers">Using Hadoop Mappers and Reducers</a></li>
<li><a href="#complete-hadoop-wordcount-example" id="markdown-toc-complete-hadoop-wordcount-example">Complete Hadoop WordCount Example</a></li>
</ul>
<h3 id="project-configuration">Project Configuration</h3>
<p>Support for Haddop input/output formats is part of the <code>flink-java</code> and
<code>flink-scala</code> Maven modules that are always required when writing Flink jobs.
The code is located in <code>org.apache.flink.api.java.hadoop</code> and
<code>org.apache.flink.api.scala.hadoop</code> in an additional sub-package for the
<code>mapred</code> and <code>mapreduce</code> API.</p>
<p>Support for Hadoop Mappers and Reducers is contained in the <code>flink-staging</code>
Maven module.
This code resides in the <code>org.apache.flink.hadoopcompatibility</code>
package.</p>
<p>Add the following dependency to your <code>pom.xml</code> if you want to reuse Mappers
and Reducers.</p>
<div class="highlight"><pre><code class="language-xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-hadoop-compatibility<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.9.0<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span></code></pre></div>
<h3 id="using-hadoop-data-types">Using Hadoop Data Types</h3>
<p>Flink supports all Hadoop <code>Writable</code> and <code>WritableComparable</code> data types
out-of-the-box. You do not need to include the Hadoop Compatibility dependency,
if you only want to use your Hadoop data types. See the
<a href="programming_guide.html#data-types">Programming Guide</a> for more details.</p>
<h3 id="using-hadoop-inputformats">Using Hadoop InputFormats</h3>
<p>Hadoop input formats can be used to create a data source by using
one of the methods <code>readHadoopFile</code> or <code>createHadoopInput</code> of the
<code>ExecutionEnvironment</code>. The former is used for input formats derived
from <code>FileInputFormat</code> while the latter has to be used for general purpose
input formats.</p>
<p>The resulting <code>DataSet</code> contains 2-tuples where the first field
is the key and the second field is the value retrieved from the Hadoop
InputFormat.</p>
<p>The following example shows how to use Hadoop’s <code>TextInputFormat</code>.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span>
<span class="n">env</span><span class="o">.</span><span class="na">readHadoopFile</span><span class="o">(</span><span class="k">new</span> <span class="nf">TextInputFormat</span><span class="o">(),</span> <span class="n">LongWritable</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">textPath</span><span class="o">);</span>
<span class="c1">// Do something with the data.</span>
<span class="o">[...]</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
<span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">LongWritable</span>, <span class="kt">Text</span><span class="o">)]</span> <span class="k">=</span>
<span class="n">env</span><span class="o">.</span><span class="n">readHadoopFile</span><span class="o">(</span><span class="k">new</span> <span class="nc">TextInputFormat</span><span class="o">,</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">LongWritable</span><span class="o">],</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">Text</span><span class="o">],</span> <span class="n">textPath</span><span class="o">)</span>
<span class="c1">// Do something with the data.</span>
<span class="o">[</span><span class="kt">...</span><span class="o">]</span></code></pre></div>
</div>
</div>
<h3 id="using-hadoop-outputformats">Using Hadoop OutputFormats</h3>
<p>Flink provides a compatibility wrapper for Hadoop <code>OutputFormats</code>. Any class
that implements <code>org.apache.hadoop.mapred.OutputFormat</code> or extends
<code>org.apache.hadoop.mapreduce.OutputFormat</code> is supported.
The OutputFormat wrapper expects its input data to be a DataSet containing
2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.</p>
<p>The following example shows how to use Hadoop’s <code>TextOutputFormat</code>.</p>
<div class="codetabs">
<div data-lang="java">
<div class="highlight"><pre><code class="language-java"><span class="c1">// Obtain the result we want to emit</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">&gt;&gt;</span> <span class="n">hadoopResult</span> <span class="o">=</span> <span class="o">[...]</span>
<span class="c1">// Set up the Hadoop TextOutputFormat.</span>
<span class="n">HadoopOutputFormat</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">&gt;</span> <span class="n">hadoopOF</span> <span class="o">=</span>
<span class="c1">// create the Flink wrapper.</span>
<span class="k">new</span> <span class="n">HadoopOutputFormat</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">&gt;(</span>
<span class="c1">// set the Hadoop OutputFormat and specify the job.</span>
<span class="k">new</span> <span class="n">TextOutputFormat</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">&gt;(),</span> <span class="n">job</span>
<span class="o">);</span>
<span class="n">hadoopOF</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="s">&quot;mapreduce.output.textoutputformat.separator&quot;</span><span class="o">,</span> <span class="s">&quot; &quot;</span><span class="o">);</span>
<span class="n">TextOutputFormat</span><span class="o">.</span><span class="na">setOutputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">outputPath</span><span class="o">));</span>
<span class="c1">// Emit data using the Hadoop TextOutputFormat.</span>
<span class="n">hadoopResult</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala"><span class="c1">// Obtain your result to emit.</span>
<span class="k">val</span> <span class="n">hadoopResult</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Text</span>, <span class="kt">IntWritable</span><span class="o">)]</span> <span class="k">=</span> <span class="o">[</span><span class="kt">...</span><span class="o">]</span>
<span class="k">val</span> <span class="n">hadoopOF</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">HadoopOutputFormat</span><span class="o">[</span><span class="kt">Text</span>,<span class="kt">IntWritable</span><span class="o">](</span>
<span class="k">new</span> <span class="nc">TextOutputFormat</span><span class="o">[</span><span class="kt">Text</span>, <span class="kt">IntWritable</span><span class="o">],</span>
<span class="k">new</span> <span class="nc">JobConf</span><span class="o">)</span>
<span class="n">hadoopOF</span><span class="o">.</span><span class="n">getJobConf</span><span class="o">.</span><span class="n">set</span><span class="o">(</span><span class="s">&quot;mapred.textoutputformat.separator&quot;</span><span class="o">,</span> <span class="s">&quot; &quot;</span><span class="o">)</span>
<span class="nc">FileOutputFormat</span><span class="o">.</span><span class="n">setOutputPath</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">.</span><span class="n">getJobConf</span><span class="o">,</span> <span class="k">new</span> <span class="nc">Path</span><span class="o">(</span><span class="n">resultPath</span><span class="o">))</span>
<span class="n">hadoopResult</span><span class="o">.</span><span class="n">output</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">)</span></code></pre></div>
</div>
</div>
<h3 id="using-hadoop-mappers-and-reducers">Using Hadoop Mappers and Reducers</h3>
<p>Hadoop Mappers are semantically equivalent to Flink’s <a href="dataset_transformations.html#flatmap">FlatMapFunctions</a> and Hadoop Reducers are equivalent to Flink’s <a href="dataset_transformations.html#groupreduce-on-grouped-dataset">GroupReduceFunctions</a>. Flink provides wrappers for implementations of Hadoop MapReduce’s <code>Mapper</code> and <code>Reducer</code> interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop’s mapred API (<code>org.apache.hadoop.mapred</code>) are supported.</p>
<p>The wrappers take a <code>DataSet&lt;Tuple2&lt;KEYIN,VALUEIN&gt;&gt;</code> as input and produce a <code>DataSet&lt;Tuple2&lt;KEYOUT,VALUEOUT&gt;&gt;</code> as output where <code>KEYIN</code> and <code>KEYOUT</code> are the keys and <code>VALUEIN</code> and <code>VALUEOUT</code> are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (<code>HadoopReduceCombineFunction</code>) and without a Combiner (<code>HadoopReduceFunction</code>). The wrappers accept an optional <code>JobConf</code> object to configure the Hadoop Mapper or Reducer.</p>
<p>Flink’s function wrappers are</p>
<ul>
<li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction</code>,</li>
<li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction</code>, and</li>
<li><code>org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction</code>.</li>
</ul>
<p>and can be used as regular Flink <a href="dataset_transformations.html#flatmap">FlatMapFunctions</a> or <a href="dataset_transformations.html#groupreduce-on-grouped-dataset">GroupReduceFunctions</a>.</p>
<p>The following example shows how to use Hadoop <code>Mapper</code> and <code>Reducer</code> functions.</p>
<div class="highlight"><pre><code class="language-java"><span class="c1">// Obtain data to process somehow.</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">&gt;&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="o">[...]</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">&gt;&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">text</span>
<span class="c1">// use Hadoop Mapper (Tokenizer) as MapFunction</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopMapFunction</span><span class="o">&lt;</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">&gt;(</span>
<span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">()</span>
<span class="o">))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="c1">// use Hadoop Reducer (Counter) as Reduce- and CombineFunction</span>
<span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopReduceCombineFunction</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">&gt;(</span>
<span class="k">new</span> <span class="nf">Counter</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Counter</span><span class="o">()</span>
<span class="o">));</span></code></pre></div>
<p><strong>Please note:</strong> The Reducer wrapper works on groups as defined by Flink’s <a href="dataset_transformations.html#transformations-on-grouped-dataset">groupBy()</a> operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the <code>JobConf</code>.</p>
<h3 id="complete-hadoop-wordcount-example">Complete Hadoop WordCount Example</h3>
<p>The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// Set up the Hadoop TextInputFormat.</span>
<span class="n">Job</span> <span class="n">job</span> <span class="o">=</span> <span class="n">Job</span><span class="o">.</span><span class="na">getInstance</span><span class="o">();</span>
<span class="n">HadoopInputFormat</span><span class="o">&lt;</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">&gt;</span> <span class="n">hadoopIF</span> <span class="o">=</span>
<span class="k">new</span> <span class="n">HadoopInputFormat</span><span class="o">&lt;</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">&gt;(</span>
<span class="k">new</span> <span class="nf">TextInputFormat</span><span class="o">(),</span> <span class="n">LongWritable</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">job</span>
<span class="o">);</span>
<span class="n">TextInputFormat</span><span class="o">.</span><span class="na">addInputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">inputPath</span><span class="o">));</span>
<span class="c1">// Read data using the Hadoop TextInputFormat.</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">&gt;&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">createInput</span><span class="o">(</span><span class="n">hadoopIF</span><span class="o">);</span>
<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">&gt;&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">text</span>
<span class="c1">// use Hadoop Mapper (Tokenizer) as MapFunction</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopMapFunction</span><span class="o">&lt;</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">&gt;(</span>
<span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">()</span>
<span class="o">))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="c1">// use Hadoop Reducer (Counter) as Reduce- and CombineFunction</span>
<span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">HadoopReduceCombineFunction</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">,</span> <span class="n">LongWritable</span><span class="o">&gt;(</span>
<span class="k">new</span> <span class="nf">Counter</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Counter</span><span class="o">()</span>
<span class="o">));</span>
<span class="c1">// Set up the Hadoop TextOutputFormat.</span>
<span class="n">HadoopOutputFormat</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">&gt;</span> <span class="n">hadoopOF</span> <span class="o">=</span>
<span class="k">new</span> <span class="n">HadoopOutputFormat</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">&gt;(</span>
<span class="k">new</span> <span class="n">TextOutputFormat</span><span class="o">&lt;</span><span class="n">Text</span><span class="o">,</span> <span class="n">IntWritable</span><span class="o">&gt;(),</span> <span class="n">job</span>
<span class="o">);</span>
<span class="n">hadoopOF</span><span class="o">.</span><span class="na">getConfiguration</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="s">&quot;mapreduce.output.textoutputformat.separator&quot;</span><span class="o">,</span> <span class="s">&quot; &quot;</span><span class="o">);</span>
<span class="n">TextOutputFormat</span><span class="o">.</span><span class="na">setOutputPath</span><span class="o">(</span><span class="n">job</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">outputPath</span><span class="o">));</span>
<span class="c1">// Emit data using the Hadoop TextOutputFormat.</span>
<span class="n">result</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">hadoopOF</span><span class="o">);</span>
<span class="c1">// Execute Program</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Hadoop WordCount&quot;</span><span class="o">);</span></code></pre></div>
</div>
<div class="col-sm-10 col-sm-offset-1">
<!-- Disqus thread and some vertical offset -->
<div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
</div>
</div>
</div><!-- /.container -->
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="http://flink.apache.org/docs/0.9/page/js/codetabs.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
<!-- Disqus -->
<script type="text/javascript">
var disqus_shortname = 'stratosphere-eu';
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</body>
</html>