blob: 7d1f0cb63359e9d4576536e6c45658d665f6b0a4 [file] [log] [blame]
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<META http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta content="Apache Forrest" name="Generator">
<meta name="Forrest-version" content="0.8">
<meta name="Forrest-skin-name" content="pelt">
<title>Hadoop Map-Reduce Tutorial</title>
<link type="text/css" href="skin/basic.css" rel="stylesheet">
<link media="screen" type="text/css" href="skin/screen.css" rel="stylesheet">
<link media="print" type="text/css" href="skin/print.css" rel="stylesheet">
<link type="text/css" href="skin/profile.css" rel="stylesheet">
<script src="skin/getBlank.js" language="javascript" type="text/javascript"></script><script src="skin/getMenu.js" language="javascript" type="text/javascript"></script><script src="skin/fontsize.js" language="javascript" type="text/javascript"></script>
<link rel="shortcut icon" href="images/favicon.ico">
</head>
<body onload="init()">
<script type="text/javascript">ndeSetTextSize();</script>
<div id="top">
<!--+
|breadtrail
+-->
<div class="breadtrail">
<a href="http://www.apache.org/">Apache</a> &gt; <a href="http://hadoop.apache.org/">Hadoop</a> &gt; <a href="http://hadoop.apache.org/core/">Core</a><script src="skin/breadcrumbs.js" language="JavaScript" type="text/javascript"></script>
</div>
<!--+
|header
+-->
<div class="header">
<!--+
|start group logo
+-->
<div class="grouplogo">
<a href="http://hadoop.apache.org/"><img class="logoImage" alt="Hadoop" src="images/hadoop-logo.jpg" title="Apache Hadoop"></a>
</div>
<!--+
|end group logo
+-->
<!--+
|start Project Logo
+-->
<div class="projectlogo">
<a href="http://hadoop.apache.org/core/"><img class="logoImage" alt="Hadoop" src="images/core-logo.gif" title="Scalable Computing Platform"></a>
</div>
<!--+
|end Project Logo
+-->
<!--+
|start Search
+-->
<div class="searchbox">
<form action="http://www.google.com/search" method="get" class="roundtopsmall">
<input value="hadoop.apache.org" name="sitesearch" type="hidden"><input onFocus="getBlank (this, 'Search the site with google');" size="25" name="q" id="query" type="text" value="Search the site with google">&nbsp;
<input name="Search" value="Search" type="submit">
</form>
</div>
<!--+
|end search
+-->
<!--+
|start Tabs
+-->
<ul id="tabs">
<li>
<a class="unselected" href="http://hadoop.apache.org/core/">Project</a>
</li>
<li>
<a class="unselected" href="http://wiki.apache.org/hadoop">Wiki</a>
</li>
<li class="current">
<a class="selected" href="index.html">Hadoop 0.17 Documentation</a>
</li>
</ul>
<!--+
|end Tabs
+-->
</div>
</div>
<div id="main">
<div id="publishedStrip">
<!--+
|start Subtabs
+-->
<div id="level2tabs"></div>
<!--+
|end Endtabs
+-->
<script type="text/javascript"><!--
document.write("Last Published: " + document.lastModified);
// --></script>
</div>
<!--+
|breadtrail
+-->
<div class="breadtrail">
&nbsp;
</div>
<!--+
|start Menu, mainarea
+-->
<!--+
|start Menu
+-->
<div id="menu">
<div onclick="SwitchMenu('menu_selected_1.1', 'skin/')" id="menu_selected_1.1Title" class="menutitle" style="background-image: url('skin/images/chapter_open.gif');">Documentation</div>
<div id="menu_selected_1.1" class="selectedmenuitemgroup" style="display: block;">
<div class="menuitem">
<a href="index.html">Overview</a>
</div>
<div class="menuitem">
<a href="quickstart.html">Quickstart</a>
</div>
<div class="menuitem">
<a href="cluster_setup.html">Cluster Setup</a>
</div>
<div class="menuitem">
<a href="hdfs_design.html">HDFS Architecture</a>
</div>
<div class="menuitem">
<a href="hdfs_user_guide.html">HDFS User Guide</a>
</div>
<div class="menuitem">
<a href="hdfs_shell.html">HDFS Shell Guide</a>
</div>
<div class="menuitem">
<a href="hdfs_permissions_guide.html">HDFS Permissions Guide</a>
</div>
<div class="menupage">
<div class="menupagetitle">Map-Reduce Tutorial</div>
</div>
<div class="menuitem">
<a href="native_libraries.html">Native Hadoop Libraries</a>
</div>
<div class="menuitem">
<a href="streaming.html">Streaming</a>
</div>
<div class="menuitem">
<a href="hod.html">Hadoop On Demand</a>
</div>
<div class="menuitem">
<a href="api/index.html">API Docs</a>
</div>
<div class="menuitem">
<a href="http://wiki.apache.org/hadoop/">Wiki</a>
</div>
<div class="menuitem">
<a href="http://wiki.apache.org/hadoop/FAQ">FAQ</a>
</div>
<div class="menuitem">
<a href="http://hadoop.apache.org/core/mailing_lists.html">Mailing Lists</a>
</div>
<div class="menuitem">
<a href="releasenotes.html">Release Notes</a>
</div>
<div class="menuitem">
<a href="changes.html">All Changes</a>
</div>
</div>
<div id="credit"></div>
<div id="roundbottom">
<img style="display: none" class="corner" height="15" width="15" alt="" src="skin/images/rc-b-l-15-1body-2menu-3menu.png"></div>
<!--+
|alternative credits
+-->
<div id="credit2"></div>
</div>
<!--+
|end Menu
+-->
<!--+
|start content
+-->
<div id="content">
<div title="Portable Document Format" class="pdflink">
<a class="dida" href="mapred_tutorial.pdf"><img alt="PDF -icon" src="skin/images/pdfdoc.gif" class="skin"><br>
PDF</a>
</div>
<h1>Hadoop Map-Reduce Tutorial</h1>
<div id="minitoc-area">
<ul class="minitoc">
<li>
<a href="#Purpose">Purpose</a>
</li>
<li>
<a href="#Pre-requisites">Pre-requisites</a>
</li>
<li>
<a href="#Overview">Overview</a>
</li>
<li>
<a href="#Inputs+and+Outputs">Inputs and Outputs</a>
</li>
<li>
<a href="#Example%3A+WordCount+v1.0">Example: WordCount v1.0</a>
<ul class="minitoc">
<li>
<a href="#Source+Code">Source Code</a>
</li>
<li>
<a href="#Usage">Usage</a>
</li>
<li>
<a href="#Walk-through">Walk-through</a>
</li>
</ul>
</li>
<li>
<a href="#Map-Reduce+-+User+Interfaces">Map-Reduce - User Interfaces</a>
<ul class="minitoc">
<li>
<a href="#Payload">Payload</a>
<ul class="minitoc">
<li>
<a href="#Mapper">Mapper</a>
</li>
<li>
<a href="#Reducer">Reducer</a>
</li>
<li>
<a href="#Partitioner">Partitioner</a>
</li>
<li>
<a href="#Reporter">Reporter</a>
</li>
<li>
<a href="#OutputCollector">OutputCollector</a>
</li>
</ul>
</li>
<li>
<a href="#Job+Configuration">Job Configuration</a>
</li>
<li>
<a href="#Task+Execution+%26+Environment">Task Execution &amp; Environment</a>
</li>
<li>
<a href="#Job+Submission+and+Monitoring">Job Submission and Monitoring</a>
<ul class="minitoc">
<li>
<a href="#Job+Control">Job Control</a>
</li>
</ul>
</li>
<li>
<a href="#Job+Input">Job Input</a>
<ul class="minitoc">
<li>
<a href="#InputSplit">InputSplit</a>
</li>
<li>
<a href="#RecordReader">RecordReader</a>
</li>
</ul>
</li>
<li>
<a href="#Job+Output">Job Output</a>
<ul class="minitoc">
<li>
<a href="#Task+Side-Effect+Files">Task Side-Effect Files</a>
</li>
<li>
<a href="#RecordWriter">RecordWriter</a>
</li>
</ul>
</li>
<li>
<a href="#Other+Useful+Features">Other Useful Features</a>
<ul class="minitoc">
<li>
<a href="#Counters">Counters</a>
</li>
<li>
<a href="#DistributedCache">DistributedCache</a>
</li>
<li>
<a href="#Tool">Tool</a>
</li>
<li>
<a href="#IsolationRunner">IsolationRunner</a>
</li>
<li>
<a href="#Debugging">Debugging</a>
</li>
<li>
<a href="#JobControl">JobControl</a>
</li>
<li>
<a href="#Data+Compression">Data Compression</a>
</li>
</ul>
</li>
</ul>
</li>
<li>
<a href="#Example%3A+WordCount+v2.0">Example: WordCount v2.0</a>
<ul class="minitoc">
<li>
<a href="#Source+Code-N10C84">Source Code</a>
</li>
<li>
<a href="#Sample+Runs">Sample Runs</a>
</li>
<li>
<a href="#Highlights">Highlights</a>
</li>
</ul>
</li>
</ul>
</div>
<a name="N1000D"></a><a name="Purpose"></a>
<h2 class="h3">Purpose</h2>
<div class="section">
<p>This document comprehensively describes all user-facing facets of the
Hadoop Map-Reduce framework and serves as a tutorial.
</p>
</div>
<a name="N10017"></a><a name="Pre-requisites"></a>
<h2 class="h3">Pre-requisites</h2>
<div class="section">
<p>Ensure that Hadoop is installed, configured and is running. More
details:</p>
<ul>
<li>
Hadoop <a href="quickstart.html">Quickstart</a> for first-time users.
</li>
<li>
Hadoop <a href="cluster_setup.html">Cluster Setup</a> for large,
distributed clusters.
</li>
</ul>
</div>
<a name="N10032"></a><a name="Overview"></a>
<h2 class="h3">Overview</h2>
<div class="section">
<p>Hadoop Map-Reduce is a software framework for easily writing
applications which process vast amounts of data (multi-terabyte data-sets)
in-parallel on large clusters (thousands of nodes) of commodity
hardware in a reliable, fault-tolerant manner.</p>
<p>A Map-Reduce <em>job</em> usually splits the input data-set into
independent chunks which are processed by the <em>map tasks</em> in a
completely parallel manner. The framework sorts the outputs of the maps,
which are then input to the <em>reduce tasks</em>. Typically both the
input and the output of the job are stored in a file-system. The framework
takes care of scheduling tasks, monitoring them and re-executes the failed
tasks.</p>
<p>Typically the compute nodes and the storage nodes are the same, that is,
the Map-Reduce framework and the <a href="hdfs_design.html">Distributed
FileSystem</a> are running on the same set of nodes. This configuration
allows the framework to effectively schedule tasks on the nodes where data
is already present, resulting in very high aggregate bandwidth across the
cluster.</p>
<p>The Map-Reduce framework consists of a single master
<span class="codefrag">JobTracker</span> and one slave <span class="codefrag">TaskTracker</span> per
cluster-node. The master is responsible for scheduling the jobs' component
tasks on the slaves, monitoring them and re-executing the failed tasks. The
slaves execute the tasks as directed by the master.</p>
<p>Minimally, applications specify the input/output locations and supply
<em>map</em> and <em>reduce</em> functions via implementations of
appropriate interfaces and/or abstract-classes. These, and other job
parameters, comprise the <em>job configuration</em>. The Hadoop
<em>job client</em> then submits the job (jar/executable etc.) and
configuration to the <span class="codefrag">JobTracker</span> which then assumes the
responsibility of distributing the software/configuration to the slaves,
scheduling tasks and monitoring them, providing status and diagnostic
information to the job-client.</p>
<p>Although the Hadoop framework is implemented in Java<sup>TM</sup>,
Map-Reduce applications need not be written in Java.</p>
<ul>
<li>
<a href="api/org/apache/hadoop/streaming/package-summary.html">
Hadoop Streaming</a> is a utility which allows users to create and run
jobs with any executables (e.g. shell utilities) as the mapper and/or
the reducer.
</li>
<li>
<a href="api/org/apache/hadoop/mapred/pipes/package-summary.html">
Hadoop Pipes</a> is a <a href="http://www.swig.org/">SWIG</a>-
compatible <em>C++ API</em> to implement Map-Reduce applications (non
JNI<sup>TM</sup> based).
</li>
</ul>
</div>
<a name="N1008B"></a><a name="Inputs+and+Outputs"></a>
<h2 class="h3">Inputs and Outputs</h2>
<div class="section">
<p>The Map-Reduce framework operates exclusively on
<span class="codefrag">&lt;key, value&gt;</span> pairs, that is, the framework views the
input to the job as a set of <span class="codefrag">&lt;key, value&gt;</span> pairs and
produces a set of <span class="codefrag">&lt;key, value&gt;</span> pairs as the output of
the job, conceivably of different types.</p>
<p>The <span class="codefrag">key</span> and <span class="codefrag">value</span> classes have to be
serializable by the framework and hence need to implement the
<a href="api/org/apache/hadoop/io/Writable.html">Writable</a>
interface. Additionally, the <span class="codefrag">key</span> classes have to implement the
<a href="api/org/apache/hadoop/io/WritableComparable.html">
WritableComparable</a> interface to facilitate sorting by the framework.
</p>
<p>Input and Output types of a Map-Reduce job:</p>
<p>
(input) <span class="codefrag">&lt;k1, v1&gt;</span>
-&gt;
<strong>map</strong>
-&gt;
<span class="codefrag">&lt;k2, v2&gt;</span>
-&gt;
<strong>combine</strong>
-&gt;
<span class="codefrag">&lt;k2, v2&gt;</span>
-&gt;
<strong>reduce</strong>
-&gt;
<span class="codefrag">&lt;k3, v3&gt;</span> (output)
</p>
</div>
<a name="N100CD"></a><a name="Example%3A+WordCount+v1.0"></a>
<h2 class="h3">Example: WordCount v1.0</h2>
<div class="section">
<p>Before we jump into the details, lets walk through an example Map-Reduce
application to get a flavour for how they work.</p>
<p>
<span class="codefrag">WordCount</span> is a simple application that counts the number of
occurences of each word in a given input set.</p>
<p>This works with a
<a href="quickstart.html#Standalone+Operation">local-standalone</a>,
<a href="quickstart.html#SingleNodeSetup">pseudo-distributed</a> or
<a href="quickstart.html#Fully-Distributed+Operation">fully-distributed</a>
Hadoop installation.</p>
<a name="N100EA"></a><a name="Source+Code"></a>
<h3 class="h4">Source Code</h3>
<table class="ForrestTable" cellspacing="1" cellpadding="4">
<tr>
<th colspan="1" rowspan="1"></th>
<th colspan="1" rowspan="1">WordCount.java</th>
</tr>
<tr>
<td colspan="1" rowspan="1">1.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">package org.myorg;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">2.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">3.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import java.io.IOException;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">4.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import java.util.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">5.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">6.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.fs.Path;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">7.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.conf.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">8.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.io.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">9.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.mapred.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">10.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.util.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">11.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">12.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">public class WordCount {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">13.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">14.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static class Map extends MapReduceBase
implements Mapper&lt;LongWritable, Text, Text, IntWritable&gt; {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">15.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
private final static IntWritable one = new IntWritable(1);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">16.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private Text word = new Text();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">17.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">18.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
public void map(LongWritable key, Text value,
OutputCollector&lt;Text, IntWritable&gt; output,
Reporter reporter) throws IOException {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">19.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">String line = value.toString();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">20.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">StringTokenizer tokenizer = new StringTokenizer(line);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">21.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while (tokenizer.hasMoreTokens()) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">22.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">word.set(tokenizer.nextToken());</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">23.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">output.collect(word, one);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">24.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">25.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">26.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">27.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">28.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static class Reduce extends MapReduceBase implements
Reducer&lt;Text, IntWritable, Text, IntWritable&gt; {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">29.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
public void reduce(Text key, Iterator&lt;IntWritable&gt; values,
OutputCollector&lt;Text, IntWritable&gt; output,
Reporter reporter) throws IOException {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">30.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">int sum = 0;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">31.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while (values.hasNext()) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">32.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">sum += values.next().get();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">33.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">34.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">output.collect(key, new IntWritable(sum));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">35.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">36.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">37.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">38.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static void main(String[] args) throws Exception {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">39.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
JobConf conf = new JobConf(WordCount.class);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">40.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setJobName("wordcount");</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">41.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">42.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputKeyClass(Text.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">43.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputValueClass(IntWritable.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">44.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">45.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setMapperClass(Map.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">46.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setCombinerClass(Reduce.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">47.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setReducerClass(Reduce.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">48.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">49.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setInputFormat(TextInputFormat.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">50.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputFormat(TextOutputFormat.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">51.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">52.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">FileInputFormat.setInputPaths(conf, new Path(args[0]));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">53.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">FileOutputFormat.setOutputPath(conf, new Path(args[1]));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">54.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">55.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">JobClient.runJob(conf);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">57.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">58.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">59.</td>
<td colspan="1" rowspan="1"></td>
</tr>
</table>
<a name="N1046C"></a><a name="Usage"></a>
<h3 class="h4">Usage</h3>
<p>Assuming <span class="codefrag">HADOOP_HOME</span> is the root of the installation and
<span class="codefrag">HADOOP_VERSION</span> is the Hadoop version installed, compile
<span class="codefrag">WordCount.java</span> and create a jar:</p>
<p>
<span class="codefrag">$ mkdir wordcount_classes</span>
<br>
<span class="codefrag">
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar
-d wordcount_classes WordCount.java
</span>
<br>
<span class="codefrag">$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .</span>
</p>
<p>Assuming that:</p>
<ul>
<li>
<span class="codefrag">/usr/joe/wordcount/input</span> - input directory in HDFS
</li>
<li>
<span class="codefrag">/usr/joe/wordcount/output</span> - output directory in HDFS
</li>
</ul>
<p>Sample text-files as input:</p>
<p>
<span class="codefrag">$ bin/hadoop dfs -ls /usr/joe/wordcount/input/</span>
<br>
<span class="codefrag">/usr/joe/wordcount/input/file01</span>
<br>
<span class="codefrag">/usr/joe/wordcount/input/file02</span>
<br>
<br>
<span class="codefrag">$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01</span>
<br>
<span class="codefrag">Hello World Bye World</span>
<br>
<br>
<span class="codefrag">$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02</span>
<br>
<span class="codefrag">Hello Hadoop Goodbye Hadoop</span>
</p>
<p>Run the application:</p>
<p>
<span class="codefrag">
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount
/usr/joe/wordcount/input /usr/joe/wordcount/output
</span>
</p>
<p>Output:</p>
<p>
<span class="codefrag">
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
</span>
<br>
<span class="codefrag">Bye 1</span>
<br>
<span class="codefrag">Goodbye 1</span>
<br>
<span class="codefrag">Hadoop 2</span>
<br>
<span class="codefrag">Hello 2</span>
<br>
<span class="codefrag">World 2</span>
<br>
</p>
<a name="N104EC"></a><a name="Walk-through"></a>
<h3 class="h4">Walk-through</h3>
<p>The <span class="codefrag">WordCount</span> application is quite straight-forward.</p>
<p>The <span class="codefrag">Mapper</span> implementation (lines 14-26), via the
<span class="codefrag">map</span> method (lines 18-25), processes one line at a time,
as provided by the specified <span class="codefrag">TextInputFormat</span> (line 49).
It then splits the line into tokens separated by whitespaces, via the
<span class="codefrag">StringTokenizer</span>, and emits a key-value pair of
<span class="codefrag">&lt; &lt;word&gt;, 1&gt;</span>.</p>
<p>
For the given sample input the first map emits:<br>
<span class="codefrag">&lt; Hello, 1&gt;</span>
<br>
<span class="codefrag">&lt; World, 1&gt;</span>
<br>
<span class="codefrag">&lt; Bye, 1&gt;</span>
<br>
<span class="codefrag">&lt; World, 1&gt;</span>
<br>
</p>
<p>
The second map emits:<br>
<span class="codefrag">&lt; Hello, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hadoop, 1&gt;</span>
<br>
<span class="codefrag">&lt; Goodbye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hadoop, 1&gt;</span>
<br>
</p>
<p>We'll learn more about the number of maps spawned for a given job, and
how to control them in a fine-grained manner, a bit later in the
tutorial.</p>
<p>
<span class="codefrag">WordCount</span> also specifies a <span class="codefrag">combiner</span> (line
46). Hence, the output of each map is passed through the local combiner
(which is same as the <span class="codefrag">Reducer</span> as per the job
configuration) for local aggregation, after being sorted on the
<em>key</em>s.</p>
<p>
The output of the first map:<br>
<span class="codefrag">&lt; Bye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hello, 1&gt;</span>
<br>
<span class="codefrag">&lt; World, 2&gt;</span>
<br>
</p>
<p>
The output of the second map:<br>
<span class="codefrag">&lt; Goodbye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hadoop, 2&gt;</span>
<br>
<span class="codefrag">&lt; Hello, 1&gt;</span>
<br>
</p>
<p>The <span class="codefrag">Reducer</span> implementation (lines 28-36), via the
<span class="codefrag">reduce</span> method (lines 29-35) just sums up the values,
which are the occurence counts for each key (i.e. words in this example).
</p>
<p>
Thus the output of the job is:<br>
<span class="codefrag">&lt; Bye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Goodbye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hadoop, 2&gt;</span>
<br>
<span class="codefrag">&lt; Hello, 2&gt;</span>
<br>
<span class="codefrag">&lt; World, 2&gt;</span>
<br>
</p>
<p>The <span class="codefrag">run</span> method specifies various facets of the job, such
as the input/output paths (passed via the command line), key/value
types, input/output formats etc., in the <span class="codefrag">JobConf</span>.
It then calls the <span class="codefrag">JobClient.runJob</span> (line 55) to submit the
and monitor its progress.</p>
<p>We'll learn more about <span class="codefrag">JobConf</span>, <span class="codefrag">JobClient</span>,
<span class="codefrag">Tool</span> and other interfaces and classes a bit later in the
tutorial.</p>
</div>
<a name="N105A3"></a><a name="Map-Reduce+-+User+Interfaces"></a>
<h2 class="h3">Map-Reduce - User Interfaces</h2>
<div class="section">
<p>This section provides a reasonable amount of detail on every user-facing
aspect of the Map-Reduce framwork. This should help users implement,
configure and tune their jobs in a fine-grained manner. However, please
note that the javadoc for each class/interface remains the most
comprehensive documentation available; this is only meant to be a tutorial.
</p>
<p>Let us first take the <span class="codefrag">Mapper</span> and <span class="codefrag">Reducer</span>
interfaces. Applications typically implement them to provide the
<span class="codefrag">map</span> and <span class="codefrag">reduce</span> methods.</p>
<p>We will then discuss other core interfaces including
<span class="codefrag">JobConf</span>, <span class="codefrag">JobClient</span>, <span class="codefrag">Partitioner</span>,
<span class="codefrag">OutputCollector</span>, <span class="codefrag">Reporter</span>,
<span class="codefrag">InputFormat</span>, <span class="codefrag">OutputFormat</span> and others.</p>
<p>Finally, we will wrap up by discussing some useful features of the
framework such as the <span class="codefrag">DistributedCache</span>,
<span class="codefrag">IsolationRunner</span> etc.</p>
<a name="N105DC"></a><a name="Payload"></a>
<h3 class="h4">Payload</h3>
<p>Applications typically implement the <span class="codefrag">Mapper</span> and
<span class="codefrag">Reducer</span> interfaces to provide the <span class="codefrag">map</span> and
<span class="codefrag">reduce</span> methods. These form the core of the job.</p>
<a name="N105F1"></a><a name="Mapper"></a>
<h4>Mapper</h4>
<p>
<a href="api/org/apache/hadoop/mapred/Mapper.html">
Mapper</a> maps input key/value pairs to a set of intermediate
key/value pairs.</p>
<p>Maps are the individual tasks that transform input records into
intermediate records. The transformed intermediate records do not need
to be of the same type as the input records. A given input pair may
map to zero or many output pairs.</p>
<p>The Hadoop Map-Reduce framework spawns one map task for each
<span class="codefrag">InputSplit</span> generated by the <span class="codefrag">InputFormat</span> for
the job.</p>
<p>Overall, <span class="codefrag">Mapper</span> implementations are passed the
<span class="codefrag">JobConf</span> for the job via the
<a href="api/org/apache/hadoop/mapred/JobConfigurable.html#configure(org.apache.hadoop.mapred.JobConf)">
JobConfigurable.configure(JobConf)</a> method and override it to
initialize themselves. The framework then calls
<a href="api/org/apache/hadoop/mapred/Mapper.html#map(K1, V1, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)">
map(WritableComparable, Writable, OutputCollector, Reporter)</a> for
each key/value pair in the <span class="codefrag">InputSplit</span> for that task.
Applications can then override the
<a href="api/org/apache/hadoop/io/Closeable.html#close()">
Closeable.close()</a> method to perform any required cleanup.</p>
<p>Output pairs do not need to be of the same types as input pairs. A
given input pair may map to zero or many output pairs. Output pairs
are collected with calls to
<a href="api/org/apache/hadoop/mapred/OutputCollector.html#collect(K, V)">
OutputCollector.collect(WritableComparable,Writable)</a>.</p>
<p>Applications can use the <span class="codefrag">Reporter</span> to report
progress, set application-level status messages and update
<span class="codefrag">Counters</span>, or just indicate that they are alive.</p>
<p>All intermediate values associated with a given output key are
subsequently grouped by the framework, and passed to the
<span class="codefrag">Reducer</span>(s) to determine the final output. Users can
control the grouping by specifying a <span class="codefrag">Comparator</span> via
<a href="api/org/apache/hadoop/mapred/JobConf.html#setOutputKeyComparatorClass(java.lang.Class)">
JobConf.setOutputKeyComparatorClass(Class)</a>.</p>
<p>The <span class="codefrag">Mapper</span> outputs are sorted and then
partitioned per <span class="codefrag">Reducer</span>. The total number of partitions is
the same as the number of reduce tasks for the job. Users can control
which keys (and hence records) go to which <span class="codefrag">Reducer</span> by
implementing a custom <span class="codefrag">Partitioner</span>.</p>
<p>Users can optionally specify a <span class="codefrag">combiner</span>, via
<a href="api/org/apache/hadoop/mapred/JobConf.html#setCombinerClass(java.lang.Class)">
JobConf.setCombinerClass(Class)</a>, to perform local aggregation of
the intermediate outputs, which helps to cut down the amount of data
transferred from the <span class="codefrag">Mapper</span> to the <span class="codefrag">Reducer</span>.
</p>
<p>The intermediate, sorted outputs are always stored in files of
<a href="api/org/apache/hadoop/io/SequenceFile.html">
SequenceFile</a> format. Applications can control if, and how, the
intermediate outputs are to be compressed and the
<a href="api/org/apache/hadoop/io/compress/CompressionCodec.html">
CompressionCodec</a> to be used via the <span class="codefrag">JobConf</span>.
</p>
<a name="N1066B"></a><a name="How+Many+Maps%3F"></a>
<h5>How Many Maps?</h5>
<p>The number of maps is usually driven by the total size of the
inputs, that is, the total number of blocks of the input files.</p>
<p>The right level of parallelism for maps seems to be around 10-100
maps per-node, although it has been set up to 300 maps for very
cpu-light map tasks. Task setup takes awhile, so it is best if the
maps take at least a minute to execute.</p>
<p>Thus, if you expect 10TB of input data and have a blocksize of
<span class="codefrag">128MB</span>, you'll end up with 82,000 maps, unless
<a href="api/org/apache/hadoop/mapred/JobConf.html#setNumMapTasks(int)">
setNumMapTasks(int)</a> (which only provides a hint to the framework)
is used to set it even higher.</p>
<a name="N10683"></a><a name="Reducer"></a>
<h4>Reducer</h4>
<p>
<a href="api/org/apache/hadoop/mapred/Reducer.html">
Reducer</a> reduces a set of intermediate values which share a key to
a smaller set of values.</p>
<p>The number of reduces for the job is set by the user
via <a href="api/org/apache/hadoop/mapred/JobConf.html#setNumReduceTasks(int)">
JobConf.setNumReduceTasks(int)</a>.</p>
<p>Overall, <span class="codefrag">Reducer</span> implementations are passed the
<span class="codefrag">JobConf</span> for the job via the
<a href="api/org/apache/hadoop/mapred/JobConfigurable.html#configure(org.apache.hadoop.mapred.JobConf)">
JobConfigurable.configure(JobConf)</a> method and can override it to
initialize themselves. The framework then calls
<a href="api/org/apache/hadoop/mapred/Reducer.html#reduce(K2, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)">
reduce(WritableComparable, Iterator, OutputCollector, Reporter)</a>
method for each <span class="codefrag">&lt;key, (list of values)&gt;</span>
pair in the grouped inputs. Applications can then override the
<a href="api/org/apache/hadoop/io/Closeable.html#close()">
Closeable.close()</a> method to perform any required cleanup.</p>
<p>
<span class="codefrag">Reducer</span> has 3 primary phases: shuffle, sort and reduce.
</p>
<a name="N106B3"></a><a name="Shuffle"></a>
<h5>Shuffle</h5>
<p>Input to the <span class="codefrag">Reducer</span> is the sorted output of the
mappers. In this phase the framework fetches the relevant partition
of the output of all the mappers, via HTTP.</p>
<a name="N106C0"></a><a name="Sort"></a>
<h5>Sort</h5>
<p>The framework groups <span class="codefrag">Reducer</span> inputs by keys (since
different mappers may have output the same key) in this stage.</p>
<p>The shuffle and sort phases occur simultaneously; while
map-outputs are being fetched they are merged.</p>
<a name="N106CF"></a><a name="Secondary+Sort"></a>
<h5>Secondary Sort</h5>
<p>If equivalence rules for grouping the intermediate keys are
required to be different from those for grouping keys before
reduction, then one may specify a <span class="codefrag">Comparator</span> via
<a href="api/org/apache/hadoop/mapred/JobConf.html#setOutputValueGroupingComparator(java.lang.Class)">
JobConf.setOutputValueGroupingComparator(Class)</a>. Since
<a href="api/org/apache/hadoop/mapred/JobConf.html#setOutputKeyComparatorClass(java.lang.Class)">
JobConf.setOutputKeyComparatorClass(Class)</a> can be used to
control how intermediate keys are grouped, these can be used in
conjunction to simulate <em>secondary sort on values</em>.</p>
<a name="N106E8"></a><a name="Reduce"></a>
<h5>Reduce</h5>
<p>In this phase the
<a href="api/org/apache/hadoop/mapred/Reducer.html#reduce(K2, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)">
reduce(WritableComparable, Iterator, OutputCollector, Reporter)</a>
method is called for each <span class="codefrag">&lt;key, (list of values)&gt;</span>
pair in the grouped inputs.</p>
<p>The output of the reduce task is typically written to the
<a href="api/org/apache/hadoop/fs/FileSystem.html">
FileSystem</a> via
<a href="api/org/apache/hadoop/mapred/OutputCollector.html#collect(K, V)">
OutputCollector.collect(WritableComparable, Writable)</a>.</p>
<p>Applications can use the <span class="codefrag">Reporter</span> to report
progress, set application-level status messages and update
<span class="codefrag">Counters</span>, or just indicate that they are alive.</p>
<p>The output of the <span class="codefrag">Reducer</span> is <em>not sorted</em>.</p>
<a name="N10716"></a><a name="How+Many+Reduces%3F"></a>
<h5>How Many Reduces?</h5>
<p>The right number of reduces seems to be <span class="codefrag">0.95</span> or
<span class="codefrag">1.75</span> multiplied by (&lt;<em>no. of nodes</em>&gt; *
<span class="codefrag">mapred.tasktracker.reduce.tasks.maximum</span>).</p>
<p>With <span class="codefrag">0.95</span> all of the reduces can launch immediately
and start transfering map outputs as the maps finish. With
<span class="codefrag">1.75</span> the faster nodes will finish their first round of
reduces and launch a second wave of reduces doing a much better job
of load balancing.</p>
<p>Increasing the number of reduces increases the framework overhead,
but increases load balancing and lowers the cost of failures.</p>
<p>The scaling factors above are slightly less than whole numbers to
reserve a few reduce slots in the framework for speculative-tasks and
failed tasks.</p>
<a name="N1073B"></a><a name="Reducer+NONE"></a>
<h5>Reducer NONE</h5>
<p>It is legal to set the number of reduce-tasks to <em>zero</em> if
no reduction is desired.</p>
<p>In this case the outputs of the map-tasks go directly to the
<span class="codefrag">FileSystem</span>, into the output path set by
<a href="api/org/apache/hadoop/mapred/FileInputFormat.html#setOutputPath(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path)">
setOutputPath(Path)</a>. The framework does not sort the
map-outputs before writing them out to the <span class="codefrag">FileSystem</span>.
</p>
<a name="N10756"></a><a name="Partitioner"></a>
<h4>Partitioner</h4>
<p>
<a href="api/org/apache/hadoop/mapred/Partitioner.html">
Partitioner</a> partitions the key space.</p>
<p>Partitioner controls the partitioning of the keys of the
intermediate map-outputs. The key (or a subset of the key) is used to
derive the partition, typically by a <em>hash function</em>. The total
number of partitions is the same as the number of reduce tasks for the
job. Hence this controls which of the <span class="codefrag">m</span> reduce tasks the
intermediate key (and hence the record) is sent to for reduction.</p>
<p>
<a href="api/org/apache/hadoop/mapred/lib/HashPartitioner.html">
HashPartitioner</a> is the default <span class="codefrag">Partitioner</span>.</p>
<a name="N10775"></a><a name="Reporter"></a>
<h4>Reporter</h4>
<p>
<a href="api/org/apache/hadoop/mapred/Reporter.html">
Reporter</a> is a facility for Map-Reduce applications to report
progress, set application-level status messages and update
<span class="codefrag">Counters</span>.</p>
<p>
<span class="codefrag">Mapper</span> and <span class="codefrag">Reducer</span> implementations can use
the <span class="codefrag">Reporter</span> to report progress or just indicate
that they are alive. In scenarios where the application takes a
significant amount of time to process individual key/value pairs,
this is crucial since the framework might assume that the task has
timed-out and kill that task. Another way to avoid this is to
set the configuration parameter <span class="codefrag">mapred.task.timeout</span> to a
high-enough value (or even set it to <em>zero</em> for no time-outs).
</p>
<p>Applications can also update <span class="codefrag">Counters</span> using the
<span class="codefrag">Reporter</span>.</p>
<a name="N1079F"></a><a name="OutputCollector"></a>
<h4>OutputCollector</h4>
<p>
<a href="api/org/apache/hadoop/mapred/OutputCollector.html">
OutputCollector</a> is a generalization of the facility provided by
the Map-Reduce framework to collect data output by the
<span class="codefrag">Mapper</span> or the <span class="codefrag">Reducer</span> (either the
intermediate outputs or the output of the job).</p>
<p>Hadoop Map-Reduce comes bundled with a
<a href="api/org/apache/hadoop/mapred/lib/package-summary.html">
library</a> of generally useful mappers, reducers, and partitioners.</p>
<a name="N107BA"></a><a name="Job+Configuration"></a>
<h3 class="h4">Job Configuration</h3>
<p>
<a href="api/org/apache/hadoop/mapred/JobConf.html">
JobConf</a> represents a Map-Reduce job configuration.</p>
<p>
<span class="codefrag">JobConf</span> is the primary interface for a user to describe
a map-reduce job to the Hadoop framework for execution. The framework
tries to faithfully execute the job as described by <span class="codefrag">JobConf</span>,
however:</p>
<ul>
<li>f
Some configuration parameters may have been marked as
<a href="api/org/apache/hadoop/conf/Configuration.html#FinalParams">
final</a> by administrators and hence cannot be altered.
</li>
<li>
While some job parameters are straight-forward to set (e.g.
<a href="api/org/apache/hadoop/mapred/JobConf.html#setNumReduceTasks(int)">
setNumReduceTasks(int)</a>), other parameters interact subtly with
the rest of the framework and/or job configuration and are
more complex to set (e.g.
<a href="api/org/apache/hadoop/mapred/JobConf.html#setNumMapTasks(int)">
setNumMapTasks(int)</a>).
</li>
</ul>
<p>
<span class="codefrag">JobConf</span> is typically used to specify the
<span class="codefrag">Mapper</span>, combiner (if any), <span class="codefrag">Partitioner</span>,
<span class="codefrag">Reducer</span>, <span class="codefrag">InputFormat</span> and
<span class="codefrag">OutputFormat</span> implementations. <span class="codefrag">JobConf</span> also
indicates the set of input files
(<a href="api/org/apache/hadoop/mapred/FileInputFormat.html#setInputPaths(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path[])">setInputPaths(JobConf, Path...)</a>
/<a href="api/org/apache/hadoop/mapred/FileInputFormat.html#addInputPath(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path)">addInputPath(JobConf, Path)</a>)
and (<a href="api/org/apache/hadoop/mapred/FileInputFormat.html#setInputPaths(org.apache.hadoop.mapred.JobConf,%20java.lang.String)">setInputPaths(JobConf, String)</a>
/<a href="api/org/apache/hadoop/mapred/FileInputFormat.html#addInputPath(org.apache.hadoop.mapred.JobConf,%20java.lang.String)">addInputPaths(JobConf, String)</a>)
and where the output files should be written
(<a href="api/org/apache/hadoop/mapred/FileInputFormat.html#setOutputPath(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path)">setOutputPath(Path)</a>).</p>
<p>Optionally, <span class="codefrag">JobConf</span> is used to specify other advanced
facets of the job such as the <span class="codefrag">Comparator</span> to be used, files
to be put in the <span class="codefrag">DistributedCache</span>, whether intermediate
and/or job outputs are to be compressed (and how), debugging via
user-provided scripts
(<a href="api/org/apache/hadoop/mapred/JobConf.html#setMapDebugScript(java.lang.String)">setMapDebugScript(String)</a>/<a href="api/org/apache/hadoop/mapred/JobConf.html#setReduceDebugScript(java.lang.String)">setReduceDebugScript(String)</a>)
, whether job tasks can be executed in a <em>speculative</em> manner
(<a href="api/org/apache/hadoop/mapred/JobConf.html#setMapSpeculativeExecution(boolean)">setMapSpeculativeExecution(boolean)</a>)/(<a href="api/org/apache/hadoop/mapred/JobConf.html#setReduceSpeculativeExecution(boolean)">setReduceSpeculativeExecution(boolean)</a>)
, maximum number of attempts per task
(<a href="api/org/apache/hadoop/mapred/JobConf.html#setMaxMapAttempts(int)">setMaxMapAttempts(int)</a>/<a href="api/org/apache/hadoop/mapred/JobConf.html#setMaxReduceAttempts(int)">setMaxReduceAttempts(int)</a>)
, percentage of tasks failure which can be tolerated by the job
(<a href="api/org/apache/hadoop/mapred/JobConf.html#setMaxMapTaskFailuresPercent(int)">setMaxMapTaskFailuresPercent(int)</a>/<a href="api/org/apache/hadoop/mapred/JobConf.html#setMaxReduceTaskFailuresPercent(int)">setMaxReduceTaskFailuresPercent(int)</a>)
etc.</p>
<p>Of course, users can use
<a href="api/org/apache/hadoop/conf/Configuration.html#set(java.lang.String, java.lang.String)">set(String, String)</a>/<a href="api/org/apache/hadoop/conf/Configuration.html#get(java.lang.String, java.lang.String)">get(String, String)</a>
to set/get arbitrary parameters needed by applications. However, use the
<span class="codefrag">DistributedCache</span> for large amounts of (read-only) data.</p>
<a name="N1084C"></a><a name="Task+Execution+%26+Environment"></a>
<h3 class="h4">Task Execution &amp; Environment</h3>
<p>The <span class="codefrag">TaskTracker</span> executes the <span class="codefrag">Mapper</span>/
<span class="codefrag">Reducer</span> <em>task</em> as a child process in a separate jvm.
</p>
<p>The child-task inherits the environment of the parent
<span class="codefrag">TaskTracker</span>. The user can specify additional options to the
child-jvm via the <span class="codefrag">mapred.child.java.opts</span> configuration
parameter in the <span class="codefrag">JobConf</span> such as non-standard paths for the
run-time linker to search shared libraries via
<span class="codefrag">-Djava.library.path=&lt;&gt;</span> etc. If the
<span class="codefrag">mapred.child.java.opts</span> contains the symbol <em>@taskid@</em>
it is interpolated with value of <span class="codefrag">taskid</span> of the map/reduce
task.</p>
<p>Here is an example with multiple arguments and substitutions,
showing jvm GC logging, and start of a passwordless JVM JMX agent so that
it can connect with jconsole and the likes to watch child memory,
threads and get thread dumps. It also sets the maximum heap-size of the
child jvm to 512MB and adds an additional path to the
<span class="codefrag">java.library.path</span> of the child-jvm.</p>
<p>
<span class="codefrag">&lt;property&gt;</span>
<br>
&nbsp;&nbsp;<span class="codefrag">&lt;name&gt;mapred.child.java.opts&lt;/name&gt;</span>
<br>
&nbsp;&nbsp;<span class="codefrag">&lt;value&gt;</span>
<br>
&nbsp;&nbsp;&nbsp;&nbsp;<span class="codefrag">
-Xmx512M -Djava.library.path=/home/mycompany/lib
-verbose:gc -Xloggc:/tmp/@taskid@.gc</span>
<br>
&nbsp;&nbsp;&nbsp;&nbsp;<span class="codefrag">
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false</span>
<br>
&nbsp;&nbsp;<span class="codefrag">&lt;/value&gt;</span>
<br>
<span class="codefrag">&lt;/property&gt;</span>
</p>
<p>Users/admins can also specify the maximum virtual memory
of the launched child-task using <span class="codefrag">mapred.child.ulimit</span>.</p>
<p>When the job starts, the localized job directory
<span class="codefrag"> ${mapred.local.dir}/taskTracker/jobcache/$jobid/</span>
has the following directories: </p>
<ul>
<li> A job-specific shared directory, created at location
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </span>.
This directory is exposed to the users through
<span class="codefrag">job.local.dir </span>. The tasks can use this space as scratch
space and share files among them. The directory can accessed through
api <a href="api/org/apache/hadoop/mapred/JobConf.html#getJobLocalDir()">
JobConf.getJobLocalDir()</a>. It is available as System property also.
So,users can call <span class="codefrag">System.getProperty("job.local.dir")</span>;
</li>
<li>A jars directory, which has the job jar file and expanded jar </li>
<li>A job.xml file, the generic job configuration </li>
<li>Each task has directory <span class="codefrag">task-id</span> which again has the
following structure
<ul>
<li>A job.xml file, task localized job configuration </li>
<li>A directory for intermediate output files</li>
<li>The working directory of the task.
And work directory has a temporary directory
to create temporary files</li>
</ul>
</li>
</ul>
<p>The <a href="#DistributedCache">DistributedCache</a> can also be used
as a rudimentary software distribution mechanism for use in the map
and/or reduce tasks. It can be used to distribute both jars and
native libraries. The
<a href="api/org/apache/hadoop/filecache/DistributedCache.html#addArchiveToClassPath(org.apache.hadoop.fs.Path,%20org.apache.hadoop.conf.Configuration)">
DistributedCache.addArchiveToClassPath(Path, Configuration)</a> or
<a href="api/org/apache/hadoop/filecache/DistributedCache.html#addFileToClassPath(org.apache.hadoop.fs.Path,%20org.apache.hadoop.conf.Configuration)">
DistributedCache.addFileToClassPath(Path, Configuration)</a> api can
be used to cache files/jars and also add them to the <em>classpath</em>
of child-jvm. Similarly the facility provided by the
<span class="codefrag">DistributedCache</span> where-in it symlinks the cached files into
the working directory of the task can be used to distribute native
libraries and load them. The underlying detail is that child-jvm always
has its <em>current working directory</em> added to the
<span class="codefrag">java.library.path</span> and hence the cached libraries can be
loaded via <a href="http://java.sun.com/j2se/1.5.0/docs/api/java/lang/System.html#loadLibrary(java.lang.String)">
System.loadLibrary</a> or <a href="http://java.sun.com/j2se/1.5.0/docs/api/java/lang/System.html#load(java.lang.String)">
System.load</a>.</p>
<a name="N108F8"></a><a name="Job+Submission+and+Monitoring"></a>
<h3 class="h4">Job Submission and Monitoring</h3>
<p>
<a href="api/org/apache/hadoop/mapred/JobClient.html">
JobClient</a> is the primary interface by which user-job interacts
with the <span class="codefrag">JobTracker</span>.</p>
<p>
<span class="codefrag">JobClient</span> provides facilities to submit jobs, track their
progress, access component-tasks' reports/logs, get the Map-Reduce
cluster's status information and so on.</p>
<p>The job submission process involves:</p>
<ol>
<li>Checking the input and output specifications of the job.</li>
<li>Computing the <span class="codefrag">InputSplit</span> values for the job.</li>
<li>
Setting up the requisite accounting information for the
<span class="codefrag">DistributedCache</span> of the job, if necessary.
</li>
<li>
Copying the job's jar and configuration to the map-reduce system
directory on the <span class="codefrag">FileSystem</span>.
</li>
<li>
Submitting the job to the <span class="codefrag">JobTracker</span> and optionally
monitoring it's status.
</li>
</ol>
<p> Job history files are also logged to user specified directory
<span class="codefrag">hadoop.job.history.user.location</span>
which defaults to job output directory. The files are stored in
"_logs/history/" in the specified directory. Hence, by default they
will be in mapred.output.dir/_logs/history. User can stop
logging by giving the value <span class="codefrag">none</span> for
<span class="codefrag">hadoop.job.history.user.location</span>
</p>
<p> User can view the history logs summary in specified directory
using the following command <br>
<span class="codefrag">$ bin/hadoop job -history output-dir</span>
<br>
This command will print job details, failed and killed tip
details. <br>
More details about the job such as successful tasks and
task attempts made for each task can be viewed using the
following command <br>
<span class="codefrag">$ bin/hadoop job -history all output-dir</span>
<br>
</p>
<p> User can use
<a href="api/org/apache/hadoop/mapred/OutputLogFilter.html">OutputLogFilter</a>
to filter log files from the output directory listing. </p>
<p>Normally the user creates the application, describes various facets
of the job via <span class="codefrag">JobConf</span>, and then uses the
<span class="codefrag">JobClient</span> to submit the job and monitor its progress.</p>
<a name="N10958"></a><a name="Job+Control"></a>
<h4>Job Control</h4>
<p>Users may need to chain map-reduce jobs to accomplish complex
tasks which cannot be done via a single map-reduce job. This is fairly
easy since the output of the job typically goes to distributed
file-system, and the output, in turn, can be used as the input for the
next job.</p>
<p>However, this also means that the onus on ensuring jobs are
complete (success/failure) lies squarely on the clients. In such
cases, the various job-control options are:</p>
<ul>
<li>
<a href="api/org/apache/hadoop/mapred/JobClient.html#runJob(org.apache.hadoop.mapred.JobConf)">
runJob(JobConf)</a> : Submits the job and returns only after the
job has completed.
</li>
<li>
<a href="api/org/apache/hadoop/mapred/JobClient.html#submitJob(org.apache.hadoop.mapred.JobConf)">
submitJob(JobConf)</a> : Only submits the job, then poll the
returned handle to the
<a href="api/org/apache/hadoop/mapred/RunningJob.html">
RunningJob</a> to query status and make scheduling decisions.
</li>
<li>
<a href="api/org/apache/hadoop/mapred/JobConf.html#setJobEndNotificationURI(java.lang.String)">
JobConf.setJobEndNotificationURI(String)</a> : Sets up a
notification upon job-completion, thus avoiding polling.
</li>
</ul>
<a name="N10982"></a><a name="Job+Input"></a>
<h3 class="h4">Job Input</h3>
<p>
<a href="api/org/apache/hadoop/mapred/InputFormat.html">
InputFormat</a> describes the input-specification for a Map-Reduce job.
</p>
<p>The Map-Reduce framework relies on the <span class="codefrag">InputFormat</span> of
the job to:</p>
<ol>
<li>Validate the input-specification of the job.</li>
<li>
Split-up the input file(s) into logical <span class="codefrag">InputSplit</span>
instances, each of which is then assigned to an individual
<span class="codefrag">Mapper</span>.
</li>
<li>
Provide the <span class="codefrag">RecordReader</span> implementation used to
glean input records from the logical <span class="codefrag">InputSplit</span> for
processing by the <span class="codefrag">Mapper</span>.
</li>
</ol>
<p>The default behavior of file-based <span class="codefrag">InputFormat</span>
implementations, typically sub-classes of
<a href="api/org/apache/hadoop/mapred/FileInputFormat.html">
FileInputFormat</a>, is to split the input into <em>logical</em>
<span class="codefrag">InputSplit</span> instances based on the total size, in bytes, of
the input files. However, the <span class="codefrag">FileSystem</span> blocksize of the
input files is treated as an upper bound for input splits. A lower bound
on the split size can be set via <span class="codefrag">mapred.min.split.size</span>.</p>
<p>Clearly, logical splits based on input-size is insufficient for many
applications since record boundaries must be respected. In such cases,
the application should implement a <span class="codefrag">RecordReader</span>, who is
responsible for respecting record-boundaries and presents a
record-oriented view of the logical <span class="codefrag">InputSplit</span> to the
individual task.</p>
<p>
<a href="api/org/apache/hadoop/mapred/TextInputFormat.html">
TextInputFormat</a> is the default <span class="codefrag">InputFormat</span>.</p>
<p>If <span class="codefrag">TextInputFormat</span> is the <span class="codefrag">InputFormat</span> for a
given job, the framework detects input-files with the <em>.gz</em> and
<em>.lzo</em> extensions and automatically decompresses them using the
appropriate <span class="codefrag">CompressionCodec</span>. However, it must be noted that
compressed files with the above extensions cannot be <em>split</em> and
each compressed file is processed in its entirety by a single mapper.</p>
<a name="N109EC"></a><a name="InputSplit"></a>
<h4>InputSplit</h4>
<p>
<a href="api/org/apache/hadoop/mapred/InputSplit.html">
InputSplit</a> represents the data to be processed by an individual
<span class="codefrag">Mapper</span>.</p>
<p>Typically <span class="codefrag">InputSplit</span> presents a byte-oriented view of
the input, and it is the responsibility of <span class="codefrag">RecordReader</span>
to process and present a record-oriented view.</p>
<p>
<a href="api/org/apache/hadoop/mapred/FileSplit.html">
FileSplit</a> is the default <span class="codefrag">InputSplit</span>. It sets
<span class="codefrag">map.input.file</span> to the path of the input file for the
logical split.</p>
<a name="N10A11"></a><a name="RecordReader"></a>
<h4>RecordReader</h4>
<p>
<a href="api/org/apache/hadoop/mapred/RecordReader.html">
RecordReader</a> reads <span class="codefrag">&lt;key, value&gt;</span> pairs from an
<span class="codefrag">InputSplit</span>.</p>
<p>Typically the <span class="codefrag">RecordReader</span> converts the byte-oriented
view of the input, provided by the <span class="codefrag">InputSplit</span>, and
presents a record-oriented to the <span class="codefrag">Mapper</span> implementations
for processing. <span class="codefrag">RecordReader</span> thus assumes the
responsibility of processing record boundaries and presents the tasks
with keys and values.</p>
<a name="N10A34"></a><a name="Job+Output"></a>
<h3 class="h4">Job Output</h3>
<p>
<a href="api/org/apache/hadoop/mapred/OutputFormat.html">
OutputFormat</a> describes the output-specification for a Map-Reduce
job.</p>
<p>The Map-Reduce framework relies on the <span class="codefrag">OutputFormat</span> of
the job to:</p>
<ol>
<li>
Validate the output-specification of the job; for example, check that
the output directory doesn't already exist.
</li>
<li>
Provide the <span class="codefrag">RecordWriter</span> implementation used to
write the output files of the job. Output files are stored in a
<span class="codefrag">FileSystem</span>.
</li>
</ol>
<p>
<span class="codefrag">TextOutputFormat</span> is the default
<span class="codefrag">OutputFormat</span>.</p>
<a name="N10A5D"></a><a name="Task+Side-Effect+Files"></a>
<h4>Task Side-Effect Files</h4>
<p>In some applications, component tasks need to create and/or write to
side-files, which differ from the actual job-output files.</p>
<p>In such cases there could be issues with two instances of the same
<span class="codefrag">Mapper</span> or <span class="codefrag">Reducer</span> running simultaneously (for
example, speculative tasks) trying to open and/or write to the same
file (path) on the <span class="codefrag">FileSystem</span>. Hence the
application-writer will have to pick unique names per task-attempt
(using the taskid, say <span class="codefrag">task_200709221812_0001_m_000000_0</span>),
not just per task.</p>
<p>To avoid these issues the Map-Reduce framework maintains a special
<span class="codefrag">${mapred.output.dir}/_temporary/_${taskid}</span> sub-directory
accessible via <span class="codefrag">${mapred.work.output.dir}</span>
for each task-attempt on the <span class="codefrag">FileSystem</span> where the output
of the task-attempt is stored. On successful completion of the
task-attempt, the files in the
<span class="codefrag">${mapred.output.dir}/_temporary/_${taskid}</span> (only)
are <em>promoted</em> to <span class="codefrag">${mapred.output.dir}</span>. Of course,
the framework discards the sub-directory of unsuccessful task-attempts.
This process is completely transparent to the application.</p>
<p>The application-writer can take advantage of this feature by
creating any side-files required in <span class="codefrag">${mapred.work.output.dir}</span>
during execution of a task via
<a href="api/org/apache/hadoop/mapred/FileInputFormat.html#getWorkOutputPath(org.apache.hadoop.mapred.JobConf)">
FileOutputFormat.getWorkOutputPath()</a>, and the framework will promote them
similarly for succesful task-attempts, thus eliminating the need to
pick unique paths per task-attempt.</p>
<p>Note: The value of <span class="codefrag">${mapred.work.output.dir}</span> during
execution of a particular task-attempt is actually
<span class="codefrag">${mapred.output.dir}/_temporary/_{$taskid}</span>, and this value is
set by the map-reduce framework. So, just create any side-files in the
path returned by
<a href="api/org/apache/hadoop/mapred/FileInputFormat.html#getWorkOutputPath(org.apache.hadoop.mapred.JobConf)">
FileOutputFormat.getWorkOutputPath() </a>from map/reduce
task to take advantage of this feature.</p>
<p>The entire discussion holds true for maps of jobs with
reducer=NONE (i.e. 0 reduces) since output of the map, in that case,
goes directly to HDFS.</p>
<a name="N10AA5"></a><a name="RecordWriter"></a>
<h4>RecordWriter</h4>
<p>
<a href="api/org/apache/hadoop/mapred/RecordWriter.html">
RecordWriter</a> writes the output <span class="codefrag">&lt;key, value&gt;</span>
pairs to an output file.</p>
<p>RecordWriter implementations write the job outputs to the
<span class="codefrag">FileSystem</span>.</p>
<a name="N10ABC"></a><a name="Other+Useful+Features"></a>
<h3 class="h4">Other Useful Features</h3>
<a name="N10AC2"></a><a name="Counters"></a>
<h4>Counters</h4>
<p>
<span class="codefrag">Counters</span> represent global counters, defined either by
the Map-Reduce framework or applications. Each <span class="codefrag">Counter</span> can
be of any <span class="codefrag">Enum</span> type. Counters of a particular
<span class="codefrag">Enum</span> are bunched into groups of type
<span class="codefrag">Counters.Group</span>.</p>
<p>Applications can define arbitrary <span class="codefrag">Counters</span> (of type
<span class="codefrag">Enum</span>) and update them via
<a href="api/org/apache/hadoop/mapred/Reporter.html#incrCounter(java.lang.Enum, long)">
Reporter.incrCounter(Enum, long)</a> in the <span class="codefrag">map</span> and/or
<span class="codefrag">reduce</span> methods. These counters are then globally
aggregated by the framework.</p>
<a name="N10AED"></a><a name="DistributedCache"></a>
<h4>DistributedCache</h4>
<p>
<a href="api/org/apache/hadoop/filecache/DistributedCache.html">
DistributedCache</a> distributes application-specific, large, read-only
files efficiently.</p>
<p>
<span class="codefrag">DistributedCache</span> is a facility provided by the
Map-Reduce framework to cache files (text, archives, jars and so on)
needed by applications.</p>
<p>Applications specify the files to be cached via urls (hdfs:// or
http://) in the <span class="codefrag">JobConf</span>. The <span class="codefrag">DistributedCache</span>
assumes that the files specified via hdfs:// urls are already present
on the <span class="codefrag">FileSystem</span>.</p>
<p>The framework will copy the necessary files to the slave node
before any tasks for the job are executed on that node. Its
efficiency stems from the fact that the files are only copied once
per job and the ability to cache archives which are un-archived on
the slaves.</p>
<p>
<span class="codefrag">DistributedCache</span> tracks the modification timestamps of
the cached files. Clearly the cache files should not be modified by
the application or externally while the job is executing.</p>
<p>
<span class="codefrag">DistributedCache</span> can be used to distribute simple,
read-only data/text files and more complex types such as archives and
jars. Archives (zip files) are <em>un-archived</em> at the slave nodes.
Optionally users can also direct the <span class="codefrag">DistributedCache</span> to
<em>symlink</em> the cached file(s) into the <span class="codefrag">current working
directory</span> of the task via the
<a href="api/org/apache/hadoop/filecache/DistributedCache.html#createSymlink(org.apache.hadoop.conf.Configuration)">
DistributedCache.createSymlink(Configuration)</a> api. Files
have <em>execution permissions</em> set.</p>
<a name="N10B2B"></a><a name="Tool"></a>
<h4>Tool</h4>
<p>The <a href="api/org/apache/hadoop/util/Tool.html">Tool</a>
interface supports the handling of generic Hadoop command-line options.
</p>
<p>
<span class="codefrag">Tool</span> is the standard for any Map-Reduce tool or
application. The application should delegate the handling of
standard command-line options to
<a href="api/org/apache/hadoop/util/GenericOptionsParser.html">
GenericOptionsParser</a> via
<a href="api/org/apache/hadoop/util/ToolRunner.html#run(org.apache.hadoop.util.Tool, java.lang.String[])">
ToolRunner.run(Tool, String[])</a> and only handle its custom
arguments.</p>
<p>
The generic Hadoop command-line options are:<br>
<span class="codefrag">
-conf &lt;configuration file&gt;
</span>
<br>
<span class="codefrag">
-D &lt;property=value&gt;
</span>
<br>
<span class="codefrag">
-fs &lt;local|namenode:port&gt;
</span>
<br>
<span class="codefrag">
-jt &lt;local|jobtracker:port&gt;
</span>
</p>
<a name="N10B5D"></a><a name="IsolationRunner"></a>
<h4>IsolationRunner</h4>
<p>
<a href="api/org/apache/hadoop/mapred/IsolationRunner.html">
IsolationRunner</a> is a utility to help debug Map-Reduce programs.</p>
<p>To use the <span class="codefrag">IsolationRunner</span>, first set
<span class="codefrag">keep.failed.tasks.files</span> to <span class="codefrag">true</span>
(also see <span class="codefrag">keep.tasks.files.pattern</span>).</p>
<p>
Next, go to the node on which the failed task ran and go to the
<span class="codefrag">TaskTracker</span>'s local directory and run the
<span class="codefrag">IsolationRunner</span>:<br>
<span class="codefrag">$ cd &lt;local path&gt;/taskTracker/${taskid}/work</span>
<br>
<span class="codefrag">
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
</span>
</p>
<p>
<span class="codefrag">IsolationRunner</span> will run the failed task in a single
jvm, which can be in the debugger, over precisely the same input.</p>
<a name="N10B90"></a><a name="Debugging"></a>
<h4>Debugging</h4>
<p>Map/Reduce framework provides a facility to run user-provided
scripts for debugging. When map/reduce task fails, user can run
script for doing post-processing on task logs i.e task's stdout,
stderr, syslog and jobconf. The stdout and stderr of the
user-provided debug script are printed on the diagnostics.
These outputs are also displayed on job UI on demand. </p>
<p> In the following sections we discuss how to submit debug script
along with the job. For submitting debug script, first it has to
distributed. Then the script has to supplied in Configuration. </p>
<a name="N10B9C"></a><a name="How+to+distribute+script+file%3A"></a>
<h5> How to distribute script file: </h5>
<p>
To distribute the debug script file, first copy the file to the dfs.
The file can be distributed by setting the property
"mapred.cache.files" with value "path"#"script-name".
If more than one file has to be distributed, the files can be added
as comma separated paths. This property can also be set by APIs
<a href="api/org/apache/hadoop/filecache/DistributedCache.html#addCacheFile(java.net.URI,%20org.apache.hadoop.conf.Configuration)">
DistributedCache.addCacheFile(URI,conf) </a> and
<a href="api/org/apache/hadoop/filecache/DistributedCache.html#setCacheFiles(java.net.URI[],%20org.apache.hadoop.conf.Configuration)">
DistributedCache.setCacheFiles(URIs,conf) </a> where URI is of
the form "hdfs://host:port/'absolutepath'#'script-name'".
For Streaming, the file can be added through
command line option -cacheFile.
</p>
<p>
The files has to be symlinked in the current working directory of
of the task. To create symlink for the file, the property
"mapred.create.symlink" is set to "yes". This can also be set by
<a href="api/org/apache/hadoop/filecache/DistributedCache.html#createSymlink(org.apache.hadoop.conf.Configuration)">
DistributedCache.createSymLink(Configuration) </a> api.
</p>
<a name="N10BB5"></a><a name="How+to+submit+script%3A"></a>
<h5> How to submit script: </h5>
<p> A quick way to submit debug script is to set values for the
properties "mapred.map.task.debug.script" and
"mapred.reduce.task.debug.script" for debugging map task and reduce
task respectively. These properties can also be set by using APIs
<a href="api/org/apache/hadoop/mapred/JobConf.html#setMapDebugScript(java.lang.String)">
JobConf.setMapDebugScript(String) </a> and
<a href="api/org/apache/hadoop/mapred/JobConf.html#setReduceDebugScript(java.lang.String)">
JobConf.setReduceDebugScript(String) </a>. For streaming, debug
script can be submitted with command-line options -mapdebug,
-reducedebug for debugging mapper and reducer respectively.</p>
<p>The arguments of the script are task's stdout, stderr,
syslog and jobconf files. The debug command, run on the node where
the map/reduce failed, is: <br>
<span class="codefrag"> $script $stdout $stderr $syslog $jobconf </span>
</p>
<p> Pipes programs have the c++ program name as a fifth argument
for the command. Thus for the pipes programs the command is <br>
<span class="codefrag">$script $stdout $stderr $syslog $jobconf $program </span>
</p>
<a name="N10BD7"></a><a name="Default+Behavior%3A"></a>
<h5> Default Behavior: </h5>
<p> For pipes, a default script is run to process core dumps under
gdb, prints stack trace and gives info about running threads. </p>
<a name="N10BE2"></a><a name="JobControl"></a>
<h4>JobControl</h4>
<p>
<a href="api/org/apache/hadoop/mapred/jobcontrol/package-summary.html">
JobControl</a> is a utility which encapsulates a set of Map-Reduce jobs
and their dependencies.</p>
<a name="N10BEF"></a><a name="Data+Compression"></a>
<h4>Data Compression</h4>
<p>Hadoop Map-Reduce provides facilities for the application-writer to
specify compression for both intermediate map-outputs and the
job-outputs i.e. output of the reduces. It also comes bundled with
<a href="api/org/apache/hadoop/io/compress/CompressionCodec.html">
CompressionCodec</a> implementations for the
<a href="http://www.zlib.net/">zlib</a> and <a href="http://www.oberhumer.com/opensource/lzo/">lzo</a> compression
algorithms. The <a href="http://www.gzip.org/">gzip</a> file format is also
supported.</p>
<p>Hadoop also provides native implementations of the above compression
codecs for reasons of both performance (zlib) and non-availability of
Java libraries (lzo). More details on their usage and availability are
available <a href="native_libraries.html">here</a>.</p>
<a name="N10C0F"></a><a name="Intermediate+Outputs"></a>
<h5>Intermediate Outputs</h5>
<p>Applications can control compression of intermediate map-outputs
via the
<a href="api/org/apache/hadoop/mapred/JobConf.html#setCompressMapOutput(boolean)">
JobConf.setCompressMapOutput(boolean)</a> api and the
<span class="codefrag">CompressionCodec</span> to be used via the
<a href="api/org/apache/hadoop/mapred/JobConf.html#setMapOutputCompressorClass(java.lang.Class)">
JobConf.setMapOutputCompressorClass(Class)</a> api. Since
the intermediate map-outputs are always stored in the
<a href="api/org/apache/hadoop/io/SequenceFile.html">SequenceFile</a>
format, the
<a href="api/org/apache/hadoop/io/SequenceFile.CompressionType.html">
SequenceFile.CompressionType</a> (i.e.
<a href="api/org/apache/hadoop/io/SequenceFile.CompressionType.html#RECORD">
RECORD</a> /
<a href="api/org/apache/hadoop/io/SequenceFile.CompressionType.html#BLOCK">
BLOCK</a> - defaults to <span class="codefrag">RECORD</span>) can be specified via the
<a href="api/org/apache/hadoop/mapred/JobConf.html#setMapOutputCompressionType(org.apache.hadoop.io.SequenceFile.CompressionType)">
JobConf.setMapOutputCompressionType(SequenceFile.CompressionType)</a>
api.</p>
<a name="N10C3B"></a><a name="Job+Outputs"></a>
<h5>Job Outputs</h5>
<p>Applications can control compression of job-outputs via the
<a href="api/org/apache/hadoop/mapred/OutputFormatBase.html#setCompressOutput(org.apache.hadoop.mapred.JobConf,%20boolean)">
OutputFormatBase.setCompressOutput(JobConf, boolean)</a> api and the
<span class="codefrag">CompressionCodec</span> to be used can be specified via the
<a href="api/org/apache/hadoop/mapred/OutputFormatBase.html#setOutputCompressorClass(org.apache.hadoop.mapred.JobConf,%20java.lang.Class)">
OutputFormatBase.setOutputCompressorClass(JobConf, Class)</a> api.</p>
<p>If the job outputs are to be stored in the
<a href="api/org/apache/hadoop/mapred/SequenceFileOutputFormat.html">
SequenceFileOutputFormat</a>, the required
<span class="codefrag">SequenceFile.CompressionType</span> (i.e. <span class="codefrag">RECORD</span> /
<span class="codefrag">BLOCK</span> - defaults to <span class="codefrag">RECORD</span>)can be specified
via the
<a href="api/org/apache/hadoop/mapred/SequenceFileOutputFormat.html#setOutputCompressionType(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.io.SequenceFile.CompressionType)">
SequenceFileOutputFormat.setOutputCompressionType(JobConf,
SequenceFile.CompressionType)</a> api.</p>
</div>
<a name="N10C6A"></a><a name="Example%3A+WordCount+v2.0"></a>
<h2 class="h3">Example: WordCount v2.0</h2>
<div class="section">
<p>Here is a more complete <span class="codefrag">WordCount</span> which uses many of the
features provided by the Map-Reduce framework we discussed so far.</p>
<p>This needs the HDFS to be up and running, especially for the
<span class="codefrag">DistributedCache</span>-related features. Hence it only works with a
<a href="quickstart.html#SingleNodeSetup">pseudo-distributed</a> or
<a href="quickstart.html#Fully-Distributed+Operation">fully-distributed</a>
Hadoop installation.</p>
<a name="N10C84"></a><a name="Source+Code-N10C84"></a>
<h3 class="h4">Source Code</h3>
<table class="ForrestTable" cellspacing="1" cellpadding="4">
<tr>
<th colspan="1" rowspan="1"></th>
<th colspan="1" rowspan="1">WordCount.java</th>
</tr>
<tr>
<td colspan="1" rowspan="1">1.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">package org.myorg;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">2.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">3.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import java.io.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">4.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import java.util.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">5.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">6.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.fs.Path;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">7.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.filecache.DistributedCache;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">8.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.conf.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">9.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.io.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">10.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.mapred.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">11.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.util.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">12.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">13.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">public class WordCount extends Configured implements Tool {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">14.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">15.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static class Map extends MapReduceBase
implements Mapper&lt;LongWritable, Text, Text, IntWritable&gt; {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">16.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">17.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
static enum Counters { INPUT_WORDS }
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">18.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">19.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
private final static IntWritable one = new IntWritable(1);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">20.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private Text word = new Text();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">21.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">22.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private boolean caseSensitive = true;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">23.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private Set&lt;String&gt; patternsToSkip = new HashSet&lt;String&gt;();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">24.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">25.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private long numRecords = 0;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">26.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private String inputFile;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">27.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">28.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">public void configure(JobConf job) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">29.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">30.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">inputFile = job.get("map.input.file");</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">31.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">32.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">if (job.getBoolean("wordcount.skip.patterns", false)) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">33.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">Path[] patternsFiles = new Path[0];</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">34.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">try {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">35.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
patternsFiles = DistributedCache.getLocalCacheFiles(job);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">36.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">} catch (IOException ioe) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">37.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
System.err.println("Caught exception while getting cached files: "
+ StringUtils.stringifyException(ioe));
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">38.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">39.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">for (Path patternsFile : patternsFiles) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">40.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">parseSkipFile(patternsFile);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">41.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">42.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">43.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">44.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">45.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private void parseSkipFile(Path patternsFile) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">46.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">try {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">47.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
BufferedReader fis =
new BufferedReader(new FileReader(patternsFile.toString()));
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">48.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">String pattern = null;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">49.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while ((pattern = fis.readLine()) != null) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">50.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">patternsToSkip.add(pattern);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">51.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">52.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">} catch (IOException ioe) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">53.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
System.err.println("Caught exception while parsing the cached file '" +
patternsFile + "' : " +
StringUtils.stringifyException(ioe));
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">54.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">55.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">56.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">57.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
public void map(LongWritable key, Text value,
OutputCollector&lt;Text, IntWritable&gt; output,
Reporter reporter) throws IOException {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">58.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
String line =
(caseSensitive) ? value.toString() :
value.toString().toLowerCase();
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">59.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">60.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">for (String pattern : patternsToSkip) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">61.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">line = line.replaceAll(pattern, "");</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">62.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">63.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">64.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">StringTokenizer tokenizer = new StringTokenizer(line);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">65.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while (tokenizer.hasMoreTokens()) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">66.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">word.set(tokenizer.nextToken());</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">67.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">output.collect(word, one);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">68.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">reporter.incrCounter(Counters.INPUT_WORDS, 1);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">69.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">70.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">71.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">if ((++numRecords % 100) == 0) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">72.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
reporter.setStatus("Finished processing " + numRecords +
" records " + "from the input file: " +
inputFile);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">73.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">74.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">75.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">76.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">77.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static class Reduce extends MapReduceBase implements
Reducer&lt;Text, IntWritable, Text, IntWritable&gt; {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">78.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
public void reduce(Text key, Iterator&lt;IntWritable&gt; values,
OutputCollector&lt;Text, IntWritable&gt; output,
Reporter reporter) throws IOException {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">79.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">int sum = 0;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">80.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while (values.hasNext()) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">81.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">sum += values.next().get();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">82.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">83.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">output.collect(key, new IntWritable(sum));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">84.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">85.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">86.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">87.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">public int run(String[] args) throws Exception {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">88.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
JobConf conf = new JobConf(getConf(), WordCount.class);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">89.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setJobName("wordcount");</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">90.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">91.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputKeyClass(Text.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">92.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputValueClass(IntWritable.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">93.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">94.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setMapperClass(Map.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">95.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setCombinerClass(Reduce.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">96.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setReducerClass(Reduce.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">97.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">98.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setInputFormat(TextInputFormat.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">99.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputFormat(TextOutputFormat.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">100.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">101.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
List&lt;String&gt; other_args = new ArrayList&lt;String&gt;();
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">102.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">for (int i=0; i &lt; args.length; ++i) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">103.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">if ("-skip".equals(args[i])) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">104.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">105.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
conf.setBoolean("wordcount.skip.patterns", true);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">106.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">} else {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">107.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">other_args.add(args[i]);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">108.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">109.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">110.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">111.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">112.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">113.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">114.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">JobClient.runJob(conf);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">115.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">return 0;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">116.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">117.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">118.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static void main(String[] args) throws Exception {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">119.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
int res = ToolRunner.run(new Configuration(), new WordCount(),
args);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">120.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">System.exit(res);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">121.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">122.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">123.</td>
<td colspan="1" rowspan="1"></td>
</tr>
</table>
<a name="N113E6"></a><a name="Sample+Runs"></a>
<h3 class="h4">Sample Runs</h3>
<p>Sample text-files as input:</p>
<p>
<span class="codefrag">$ bin/hadoop dfs -ls /usr/joe/wordcount/input/</span>
<br>
<span class="codefrag">/usr/joe/wordcount/input/file01</span>
<br>
<span class="codefrag">/usr/joe/wordcount/input/file02</span>
<br>
<br>
<span class="codefrag">$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01</span>
<br>
<span class="codefrag">Hello World, Bye World!</span>
<br>
<br>
<span class="codefrag">$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02</span>
<br>
<span class="codefrag">Hello Hadoop, Goodbye to hadoop.</span>
</p>
<p>Run the application:</p>
<p>
<span class="codefrag">
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount
/usr/joe/wordcount/input /usr/joe/wordcount/output
</span>
</p>
<p>Output:</p>
<p>
<span class="codefrag">
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
</span>
<br>
<span class="codefrag">Bye 1</span>
<br>
<span class="codefrag">Goodbye 1</span>
<br>
<span class="codefrag">Hadoop, 1</span>
<br>
<span class="codefrag">Hello 2</span>
<br>
<span class="codefrag">World! 1</span>
<br>
<span class="codefrag">World, 1</span>
<br>
<span class="codefrag">hadoop. 1</span>
<br>
<span class="codefrag">to 1</span>
<br>
</p>
<p>Notice that the inputs differ from the first version we looked at,
and how they affect the outputs.</p>
<p>Now, lets plug-in a pattern-file which lists the word-patterns to be
ignored, via the <span class="codefrag">DistributedCache</span>.</p>
<p>
<span class="codefrag">$ hadoop dfs -cat /user/joe/wordcount/patterns.txt</span>
<br>
<span class="codefrag">\.</span>
<br>
<span class="codefrag">\,</span>
<br>
<span class="codefrag">\!</span>
<br>
<span class="codefrag">to</span>
<br>
</p>
<p>Run it again, this time with more options:</p>
<p>
<span class="codefrag">
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount
-Dwordcount.case.sensitive=true /usr/joe/wordcount/input
/usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
</span>
</p>
<p>As expected, the output:</p>
<p>
<span class="codefrag">
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
</span>
<br>
<span class="codefrag">Bye 1</span>
<br>
<span class="codefrag">Goodbye 1</span>
<br>
<span class="codefrag">Hadoop 1</span>
<br>
<span class="codefrag">Hello 2</span>
<br>
<span class="codefrag">World 2</span>
<br>
<span class="codefrag">hadoop 1</span>
<br>
</p>
<p>Run it once more, this time switch-off case-sensitivity:</p>
<p>
<span class="codefrag">
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount
-Dwordcount.case.sensitive=false /usr/joe/wordcount/input
/usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
</span>
</p>
<p>Sure enough, the output:</p>
<p>
<span class="codefrag">
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
</span>
<br>
<span class="codefrag">bye 1</span>
<br>
<span class="codefrag">goodbye 1</span>
<br>
<span class="codefrag">hadoop 2</span>
<br>
<span class="codefrag">hello 2</span>
<br>
<span class="codefrag">world 2</span>
<br>
</p>
<a name="N114BA"></a><a name="Highlights"></a>
<h3 class="h4">Highlights</h3>
<p>The second version of <span class="codefrag">WordCount</span> improves upon the
previous one by using some features offered by the Map-Reduce framework:
</p>
<ul>
<li>
Demonstrates how applications can access configuration parameters
in the <span class="codefrag">configure</span> method of the <span class="codefrag">Mapper</span> (and
<span class="codefrag">Reducer</span>) implementations (lines 28-43).
</li>
<li>
Demonstrates how the <span class="codefrag">DistributedCache</span> can be used to
distribute read-only data needed by the jobs. Here it allows the user
to specify word-patterns to skip while counting (line 104).
</li>
<li>
Demonstrates the utility of the <span class="codefrag">Tool</span> interface and the
<span class="codefrag">GenericOptionsParser</span> to handle generic Hadoop
command-line options (lines 87-116, 119).
</li>
<li>
Demonstrates how applications can use <span class="codefrag">Counters</span> (line 68)
and how they can set application-specific status information via
the <span class="codefrag">Reporter</span> instance passed to the <span class="codefrag">map</span> (and
<span class="codefrag">reduce</span>) method (line 72).
</li>
</ul>
</div>
<p>
<em>Java and JNI are trademarks or registered trademarks of
Sun Microsystems, Inc. in the United States and other countries.</em>
</p>
</div>
<!--+
|end content
+-->
<div class="clearboth">&nbsp;</div>
</div>
<div id="footer">
<!--+
|start bottomstrip
+-->
<div class="lastmodified">
<script type="text/javascript"><!--
document.write("Last Published: " + document.lastModified);
// --></script>
</div>
<div class="copyright">
Copyright &copy;
2007 <a href="http://www.apache.org/licenses/">The Apache Software Foundation.</a>
</div>
<!--+
|end bottomstrip
+-->
</div>
</body>
</html>