blob: 3a37b269014a59922c8d97cb19d96b98dbf00632 [file] [log] [blame]
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<META http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta content="Apache Forrest" name="Generator">
<meta name="Forrest-version" content="0.8">
<meta name="Forrest-skin-name" content="pelt">
<title>Hadoop Map/Reduce教程</title>
<link type="text/css" href="skin/basic.css" rel="stylesheet">
<link media="screen" type="text/css" href="skin/screen.css" rel="stylesheet">
<link media="print" type="text/css" href="skin/print.css" rel="stylesheet">
<link type="text/css" href="skin/profile.css" rel="stylesheet">
<script src="skin/getBlank.js" language="javascript" type="text/javascript"></script><script src="skin/getMenu.js" language="javascript" type="text/javascript"></script><script src="skin/fontsize.js" language="javascript" type="text/javascript"></script>
<link rel="shortcut icon" href="images/favicon.ico">
</head>
<body onload="init()">
<script type="text/javascript">ndeSetTextSize();</script>
<div id="top">
<!--+
|breadtrail
+-->
<div class="breadtrail">
<a href="http://www.apache.org/">Apache</a> &gt; <a href="http://hadoop.apache.org/">Hadoop</a> &gt; <a href="http://hadoop.apache.org/core/">Core</a><script src="skin/breadcrumbs.js" language="JavaScript" type="text/javascript"></script>
</div>
<!--+
|header
+-->
<div class="header">
<!--+
|start group logo
+-->
<div class="grouplogo">
<a href="http://hadoop.apache.org/"><img class="logoImage" alt="Hadoop" src="images/hadoop-logo.jpg" title="Apache Hadoop"></a>
</div>
<!--+
|end group logo
+-->
<!--+
|start Project Logo
+-->
<div class="projectlogo">
<a href="http://hadoop.apache.org/core/"><img class="logoImage" alt="Hadoop" src="images/core-logo.gif" title="Scalable Computing Platform"></a>
</div>
<!--+
|end Project Logo
+-->
<!--+
|start Search
+-->
<div class="searchbox">
<form action="http://www.google.com/search" method="get" class="roundtopsmall">
<input value="hadoop.apache.org" name="sitesearch" type="hidden"><input onFocus="getBlank (this, 'Search the site with google');" size="25" name="q" id="query" type="text" value="Search the site with google">&nbsp;
<input name="Search" value="Search" type="submit">
</form>
</div>
<!--+
|end search
+-->
<!--+
|start Tabs
+-->
<ul id="tabs">
<li>
<a class="unselected" href="http://hadoop.apache.org/core/">项目</a>
</li>
<li>
<a class="unselected" href="http://wiki.apache.org/hadoop">维基</a>
</li>
<li class="current">
<a class="selected" href="index.html">Hadoop 0.18文档</a>
</li>
</ul>
<!--+
|end Tabs
+-->
</div>
</div>
<div id="main">
<div id="publishedStrip">
<!--+
|start Subtabs
+-->
<div id="level2tabs"></div>
<!--+
|end Endtabs
+-->
<script type="text/javascript"><!--
document.write("Last Published: " + document.lastModified);
// --></script>
</div>
<!--+
|breadtrail
+-->
<div class="breadtrail">
&nbsp;
</div>
<!--+
|start Menu, mainarea
+-->
<!--+
|start Menu
+-->
<div id="menu">
<div onclick="SwitchMenu('menu_selected_1.1', 'skin/')" id="menu_selected_1.1Title" class="menutitle" style="background-image: url('skin/images/chapter_open.gif');">文档</div>
<div id="menu_selected_1.1" class="selectedmenuitemgroup" style="display: block;">
<div class="menuitem">
<a href="index.html">概述</a>
</div>
<div class="menuitem">
<a href="quickstart.html">快速入门</a>
</div>
<div class="menuitem">
<a href="cluster_setup.html">集群搭建</a>
</div>
<div class="menuitem">
<a href="hdfs_design.html">HDFS构架设计</a>
</div>
<div class="menuitem">
<a href="hdfs_user_guide.html">HDFS使用指南</a>
</div>
<div class="menuitem">
<a href="hdfs_permissions_guide.html">HDFS权限指南</a>
</div>
<div class="menuitem">
<a href="hdfs_quota_admin_guide.html">HDFS配额管理指南</a>
</div>
<div class="menuitem">
<a href="commands_manual.html">命令手册</a>
</div>
<div class="menuitem">
<a href="hdfs_shell.html">FS Shell使用指南</a>
</div>
<div class="menuitem">
<a href="distcp.html">DistCp使用指南</a>
</div>
<div class="menupage">
<div class="menupagetitle">Map-Reduce教程</div>
</div>
<div class="menuitem">
<a href="native_libraries.html">Hadoop本地库</a>
</div>
<div class="menuitem">
<a href="streaming.html">Streaming</a>
</div>
<div class="menuitem">
<a href="hadoop_archives.html">Hadoop Archives</a>
</div>
<div class="menuitem">
<a href="hod.html">Hadoop On Demand</a>
</div>
<div class="menuitem">
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/index.html">API参考</a>
</div>
<div class="menuitem">
<a href="http://hadoop.apache.org/core/docs/r0.18.2/jdiff/changes.html">API Changes</a>
</div>
<div class="menuitem">
<a href="http://wiki.apache.org/hadoop/">维基</a>
</div>
<div class="menuitem">
<a href="http://wiki.apache.org/hadoop/FAQ">常见问题</a>
</div>
<div class="menuitem">
<a href="http://hadoop.apache.org/core/mailing_lists.html">邮件列表</a>
</div>
<div class="menuitem">
<a href="http://hadoop.apache.org/core/docs/r0.18.2/releasenotes.html">发行说明</a>
</div>
<div class="menuitem">
<a href="http://hadoop.apache.org/core/docs/r0.18.2/changes.html">变更日志</a>
</div>
</div>
<div id="credit"></div>
<div id="roundbottom">
<img style="display: none" class="corner" height="15" width="15" alt="" src="skin/images/rc-b-l-15-1body-2menu-3menu.png"></div>
<!--+
|alternative credits
+-->
<div id="credit2"></div>
</div>
<!--+
|end Menu
+-->
<!--+
|start content
+-->
<div id="content">
<div title="Portable Document Format" class="pdflink">
<a class="dida" href="mapred_tutorial.pdf"><img alt="PDF -icon" src="skin/images/pdfdoc.gif" class="skin"><br>
PDF</a>
</div>
<h1>Hadoop Map/Reduce教程</h1>
<div id="minitoc-area">
<ul class="minitoc">
<li>
<a href="#%E7%9B%AE%E7%9A%84">目的</a>
</li>
<li>
<a href="#%E5%85%88%E5%86%B3%E6%9D%A1%E4%BB%B6">先决条件</a>
</li>
<li>
<a href="#%E6%A6%82%E8%BF%B0">概述</a>
</li>
<li>
<a href="#%E8%BE%93%E5%85%A5%E4%B8%8E%E8%BE%93%E5%87%BA">输入与输出</a>
</li>
<li>
<a href="#%E4%BE%8B%E5%AD%90%EF%BC%9AWordCount+v1.0">例子:WordCount v1.0</a>
<ul class="minitoc">
<li>
<a href="#%E6%BA%90%E4%BB%A3%E7%A0%81">源代码</a>
</li>
<li>
<a href="#%E7%94%A8%E6%B3%95">用法</a>
</li>
<li>
<a href="#%E8%A7%A3%E9%87%8A">解释</a>
</li>
</ul>
</li>
<li>
<a href="#Map%2FReduce+-+%E7%94%A8%E6%88%B7%E7%95%8C%E9%9D%A2">Map/Reduce - 用户界面</a>
<ul class="minitoc">
<li>
<a href="#%E6%A0%B8%E5%BF%83%E5%8A%9F%E8%83%BD%E6%8F%8F%E8%BF%B0">核心功能描述</a>
<ul class="minitoc">
<li>
<a href="#Mapper">Mapper</a>
</li>
<li>
<a href="#Reducer">Reducer</a>
</li>
<li>
<a href="#Partitioner">Partitioner</a>
</li>
<li>
<a href="#Reporter">Reporter</a>
</li>
<li>
<a href="#OutputCollector">OutputCollector</a>
</li>
</ul>
</li>
<li>
<a href="#%E4%BD%9C%E4%B8%9A%E9%85%8D%E7%BD%AE">作业配置</a>
</li>
<li>
<a href="#%E4%BB%BB%E5%8A%A1%E7%9A%84%E6%89%A7%E8%A1%8C%E5%92%8C%E7%8E%AF%E5%A2%83">任务的执行和环境</a>
</li>
<li>
<a href="#%E4%BD%9C%E4%B8%9A%E7%9A%84%E6%8F%90%E4%BA%A4%E4%B8%8E%E7%9B%91%E6%8E%A7">作业的提交与监控</a>
<ul class="minitoc">
<li>
<a href="#%E4%BD%9C%E4%B8%9A%E7%9A%84%E6%8E%A7%E5%88%B6">作业的控制</a>
</li>
</ul>
</li>
<li>
<a href="#%E4%BD%9C%E4%B8%9A%E7%9A%84%E8%BE%93%E5%85%A5">作业的输入</a>
<ul class="minitoc">
<li>
<a href="#InputSplit">InputSplit</a>
</li>
<li>
<a href="#RecordReader">RecordReader</a>
</li>
</ul>
</li>
<li>
<a href="#%E4%BD%9C%E4%B8%9A%E7%9A%84%E8%BE%93%E5%87%BA">作业的输出</a>
<ul class="minitoc">
<li>
<a href="#%E4%BB%BB%E5%8A%A1%E7%9A%84Side-Effect+File">任务的Side-Effect File</a>
</li>
<li>
<a href="#RecordWriter">RecordWriter</a>
</li>
</ul>
</li>
<li>
<a href="#%E5%85%B6%E4%BB%96%E6%9C%89%E7%94%A8%E7%9A%84%E7%89%B9%E6%80%A7">其他有用的特性</a>
<ul class="minitoc">
<li>
<a href="#Counters">Counters</a>
</li>
<li>
<a href="#DistributedCache">DistributedCache</a>
</li>
<li>
<a href="#Tool">Tool</a>
</li>
<li>
<a href="#IsolationRunner">IsolationRunner</a>
</li>
<li>
<a href="#Profiling">Profiling</a>
</li>
<li>
<a href="#%E8%B0%83%E8%AF%95">调试</a>
</li>
<li>
<a href="#JobControl">JobControl</a>
</li>
<li>
<a href="#%E6%95%B0%E6%8D%AE%E5%8E%8B%E7%BC%A9">数据压缩</a>
</li>
</ul>
</li>
</ul>
</li>
<li>
<a href="#%E4%BE%8B%E5%AD%90%EF%BC%9AWordCount+v2.0">例子:WordCount v2.0</a>
<ul class="minitoc">
<li>
<a href="#%E6%BA%90%E4%BB%A3%E7%A0%81-N10DC0">源代码</a>
</li>
<li>
<a href="#%E8%BF%90%E8%A1%8C%E6%A0%B7%E4%BE%8B">运行样例</a>
</li>
<li>
<a href="#%E7%A8%8B%E5%BA%8F%E8%A6%81%E7%82%B9">程序要点</a>
</li>
</ul>
</li>
</ul>
</div>
<a name="N1000D"></a><a name="%E7%9B%AE%E7%9A%84"></a>
<h2 class="h3">目的</h2>
<div class="section">
<p>这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面。</p>
</div>
<a name="N10017"></a><a name="%E5%85%88%E5%86%B3%E6%9D%A1%E4%BB%B6"></a>
<h2 class="h3">先决条件</h2>
<div class="section">
<p>请先确认Hadoop被正确安装、配置和正常运行中。更多信息见:</p>
<ul>
<li>
<a href="quickstart.html">Hadoop快速入门</a>对初次使用者。
</li>
<li>
<a href="cluster_setup.html">Hadoop集群搭建</a>对大规模分布式集群。
</li>
</ul>
</div>
<a name="N10032"></a><a name="%E6%A6%82%E8%BF%B0"></a>
<h2 class="h3">概述</h2>
<div class="section">
<p>Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。</p>
<p>一个Map/Reduce <em>作业(job)</em> 通常会把输入的数据集切分为若干独立的数据块,由
<em>map任务(task)</em>以完全并行的方式处理它们。框架会对map的输出先进行排序,
然后把结果输入给<em>reduce任务</em>。通常作业的输入和输出都会被存储在文件系统中。
整个框架负责任务的调度和监控,以及重新执行已经失败的任务。</p>
<p>通常,Map/Reduce框架和<a href="hdfs_design.html">分布式文件系统</a>是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。</p>
<p>Map/Reduce框架由一个单独的master <span class="codefrag">JobTracker</span> 和每个集群节点一个slave <span class="codefrag">TaskTracker</span>共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。</p>
<p>应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了<em>作业配置(job configuration)</em>。然后,Hadoop的 <em>job client</em>提交作业(jar包/可执行程序等)和配置信息给<span class="codefrag">JobTracker</span>,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。</p>
<p>虽然Hadoop框架是用Java<sup>TM</sup>实现的,但Map/Reduce应用程序则不一定要用
Java来写 。</p>
<ul>
<li>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/streaming/package-summary.html">
Hadoop Streaming</a>是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序
(例如:Shell工具)来做为mapper和reducer。
</li>
<li>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/pipes/package-summary.html">
Hadoop Pipes</a>是一个与<a href="http://www.swig.org/">SWIG</a>兼容的C++ API
(没有基于JNI<sup>TM</sup>技术),它也可用于实现Map/Reduce应用程序。
</li>
</ul>
</div>
<a name="N10082"></a><a name="%E8%BE%93%E5%85%A5%E4%B8%8E%E8%BE%93%E5%87%BA"></a>
<h2 class="h3">输入与输出</h2>
<div class="section">
<p>Map/Reduce框架运转在<span class="codefrag">&lt;key, value&gt;</span> 键值对上,也就是说,
框架把作业的输入看为是一组<span class="codefrag">&lt;key, value&gt;</span> 键值对,同样也产出一组
<span class="codefrag">&lt;key, value&gt;</span> 键值对做为作业的输出,这两组键值对的类型可能不同。</p>
<p>框架需要对<span class="codefrag">key</span><span class="codefrag">value</span>的类(classes)进行序列化操作,
因此,这些类需要实现 <a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/io/Writable.html">Writable</a>接口。
另外,为了方便框架执行排序操作,<span class="codefrag">key</span>类必须实现
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/io/WritableComparable.html">
WritableComparable</a>接口。
</p>
<p>一个Map/Reduce 作业的输入和输出类型如下所示:</p>
<p>
(input) <span class="codefrag">&lt;k1, v1&gt;</span>
-&gt;
<strong>map</strong>
-&gt;
<span class="codefrag">&lt;k2, v2&gt;</span>
-&gt;
<strong>combine</strong>
-&gt;
<span class="codefrag">&lt;k2, v2&gt;</span>
-&gt;
<strong>reduce</strong>
-&gt;
<span class="codefrag">&lt;k3, v3&gt;</span> (output)
</p>
</div>
<a name="N100C4"></a><a name="%E4%BE%8B%E5%AD%90%EF%BC%9AWordCount+v1.0"></a>
<h2 class="h3">例子:WordCount v1.0</h2>
<div class="section">
<p>在深入细节之前,让我们先看一个Map/Reduce的应用示例,以便对它们的工作方式有一个初步的认识。</p>
<p>
<span class="codefrag">WordCount</span>是一个简单的应用,它可以计算出指定数据集中每一个单词出现的次数。</p>
<p>这个应用适用于
<a href="quickstart.html#Standalone+Operation">单机模式</a>
<a href="quickstart.html#SingleNodeSetup">伪分布式模式</a>
<a href="quickstart.html#Fully-Distributed+Operation">完全分布式模式</a>
三种Hadoop安装方式。</p>
<a name="N100E1"></a><a name="%E6%BA%90%E4%BB%A3%E7%A0%81"></a>
<h3 class="h4">源代码</h3>
<table class="ForrestTable" cellspacing="1" cellpadding="4">
<tr>
<th colspan="1" rowspan="1"></th>
<th colspan="1" rowspan="1">WordCount.java</th>
</tr>
<tr>
<td colspan="1" rowspan="1">1.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">package org.myorg;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">2.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">3.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import java.io.IOException;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">4.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import java.util.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">5.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">6.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.fs.Path;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">7.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.conf.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">8.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.io.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">9.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.mapred.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">10.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.util.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">11.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">12.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">public class WordCount {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">13.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">14.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static class Map extends MapReduceBase
implements Mapper&lt;LongWritable, Text, Text, IntWritable&gt; {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">15.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
private final static IntWritable one = new IntWritable(1);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">16.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private Text word = new Text();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">17.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">18.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
public void map(LongWritable key, Text value,
OutputCollector&lt;Text, IntWritable&gt; output,
Reporter reporter) throws IOException {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">19.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">String line = value.toString();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">20.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">StringTokenizer tokenizer = new StringTokenizer(line);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">21.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while (tokenizer.hasMoreTokens()) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">22.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">word.set(tokenizer.nextToken());</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">23.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">output.collect(word, one);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">24.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">25.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">26.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">27.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">28.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static class Reduce extends MapReduceBase implements
Reducer&lt;Text, IntWritable, Text, IntWritable&gt; {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">29.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
public void reduce(Text key, Iterator&lt;IntWritable&gt; values,
OutputCollector&lt;Text, IntWritable&gt; output,
Reporter reporter) throws IOException {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">30.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">int sum = 0;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">31.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while (values.hasNext()) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">32.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">sum += values.next().get();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">33.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">34.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">output.collect(key, new IntWritable(sum));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">35.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">36.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">37.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">38.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static void main(String[] args) throws Exception {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">39.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
JobConf conf = new JobConf(WordCount.class);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">40.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setJobName("wordcount");</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">41.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">42.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputKeyClass(Text.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">43.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputValueClass(IntWritable.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">44.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">45.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setMapperClass(Map.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">46.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setCombinerClass(Reduce.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">47.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setReducerClass(Reduce.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">48.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">49.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setInputFormat(TextInputFormat.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">50.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputFormat(TextOutputFormat.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">51.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">52.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">FileInputFormat.setInputPaths(conf, new Path(args[0]));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">53.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">FileOutputFormat.setOutputPath(conf, new Path(args[1]));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">54.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">55.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">JobClient.runJob(conf);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">57.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">58.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">59.</td>
<td colspan="1" rowspan="1"></td>
</tr>
</table>
<a name="N10463"></a><a name="%E7%94%A8%E6%B3%95"></a>
<h3 class="h4">用法</h3>
<p>假设环境变量<span class="codefrag">HADOOP_HOME</span>对应安装时的根目录,<span class="codefrag">HADOOP_VERSION</span>对应Hadoop的当前安装版本,编译<span class="codefrag">WordCount.java</span>来创建jar包,可如下操作:</p>
<p>
<span class="codefrag">$ mkdir wordcount_classes</span>
<br>
<span class="codefrag">
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar
-d wordcount_classes WordCount.java
</span>
<br>
<span class="codefrag">$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .</span>
</p>
<p>假设:</p>
<ul>
<li>
<span class="codefrag">/usr/joe/wordcount/input</span> - 是HDFS中的输入路径
</li>
<li>
<span class="codefrag">/usr/joe/wordcount/output</span> - 是HDFS中的输出路径
</li>
</ul>
<p>用示例文本文件做为输入:</p>
<p>
<span class="codefrag">$ bin/hadoop dfs -ls /usr/joe/wordcount/input/</span>
<br>
<span class="codefrag">/usr/joe/wordcount/input/file01</span>
<br>
<span class="codefrag">/usr/joe/wordcount/input/file02</span>
<br>
<br>
<span class="codefrag">$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01</span>
<br>
<span class="codefrag">Hello World Bye World</span>
<br>
<br>
<span class="codefrag">$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02</span>
<br>
<span class="codefrag">Hello Hadoop Goodbye Hadoop</span>
</p>
<p>运行应用程序:</p>
<p>
<span class="codefrag">
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount
/usr/joe/wordcount/input /usr/joe/wordcount/output
</span>
</p>
<p>输出是:</p>
<p>
<span class="codefrag">
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
</span>
<br>
<span class="codefrag">Bye 1</span>
<br>
<span class="codefrag">Goodbye 1</span>
<br>
<span class="codefrag">Hadoop 2</span>
<br>
<span class="codefrag">Hello 2</span>
<br>
<span class="codefrag">World 2</span>
<br>
</p>
<p> 应用程序能够使用<span class="codefrag">-files</span>选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。使用选项<span class="codefrag">-libjars</span>可以向map和reduce的classpath中添加jar包。使用<span class="codefrag">-archives</span>选项程序可以传递档案文件做为参数,这些档案文件会被解压并且在task的当前工作目录下会创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。
有关命令行选项的更多细节请参考
<a href="commands_manual.html">Commands manual</a></p>
<p>使用<span class="codefrag">-libjars</span><span class="codefrag">-files</span>运行<span class="codefrag">wordcount</span>例子:<br>
<span class="codefrag"> hadoop jar hadoop-examples.jar wordcount -files cachefile.txt
-libjars mylib.jar input output </span>
</p>
<a name="N10504"></a><a name="%E8%A7%A3%E9%87%8A"></a>
<h3 class="h4">解释</h3>
<p>
<span class="codefrag">WordCount</span>应用程序非常直截了当。</p>
<p>
<span class="codefrag">Mapper</span>(14-26行)中的<span class="codefrag">map</span>方法(18-25行)通过指定的
<span class="codefrag">TextInputFormat</span>(49行)一次处理一行。然后,它通过<span class="codefrag">StringTokenizer</span>
以空格为分隔符将一行切分为若干tokens,之后,输出<span class="codefrag">&lt; &lt;word&gt;, 1&gt;</span>
形式的键值对。</p>
<p>
对于示例中的第一个输入,map输出是:<br>
<span class="codefrag">&lt; Hello, 1&gt;</span>
<br>
<span class="codefrag">&lt; World, 1&gt;</span>
<br>
<span class="codefrag">&lt; Bye, 1&gt;</span>
<br>
<span class="codefrag">&lt; World, 1&gt;</span>
<br>
</p>
<p>
第二个输入,map输出是:<br>
<span class="codefrag">&lt; Hello, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hadoop, 1&gt;</span>
<br>
<span class="codefrag">&lt; Goodbye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hadoop, 1&gt;</span>
<br>
</p>
<p>关于组成一个指定作业的map数目的确定,以及如何以更精细的方式去控制这些map,我们将在教程的后续部分学习到更多的内容。</p>
<p>
<span class="codefrag">WordCount</span>还指定了一个<span class="codefrag">combiner</span> (46行)。因此,每次map运行之后,会对输出按照<em>key</em>进行排序,然后把输出传递给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合。</p>
<p>
第一个map的输出是:<br>
<span class="codefrag">&lt; Bye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hello, 1&gt;</span>
<br>
<span class="codefrag">&lt; World, 2&gt;</span>
<br>
</p>
<p>
第二个map的输出是:<br>
<span class="codefrag">&lt; Goodbye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hadoop, 2&gt;</span>
<br>
<span class="codefrag">&lt; Hello, 1&gt;</span>
<br>
</p>
<p>
<span class="codefrag">Reducer</span>(28-36行)中的<span class="codefrag">reduce</span>方法(29-35行)
仅是将每个key(本例中就是单词)出现的次数求和。
</p>
<p>
因此这个作业的输出就是:<br>
<span class="codefrag">&lt; Bye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Goodbye, 1&gt;</span>
<br>
<span class="codefrag">&lt; Hadoop, 2&gt;</span>
<br>
<span class="codefrag">&lt; Hello, 2&gt;</span>
<br>
<span class="codefrag">&lt; World, 2&gt;</span>
<br>
</p>
<p>代码中的<span class="codefrag">run</span>方法中指定了作业的几个方面,
例如:通过命令行传递过来的输入/输出路径、key/value的类型、输入/输出的格式等等<span class="codefrag">JobConf</span>中的配置信息。随后程序调用了<span class="codefrag">JobClient.runJob</span>(55行)来提交作业并且监控它的执行。</p>
<p>我们将在本教程的后续部分学习更多的关于<span class="codefrag">JobConf</span><span class="codefrag">JobClient</span>
<span class="codefrag">Tool</span>和其他接口及类(class)。</p>
</div>
<a name="N105B5"></a><a name="Map%2FReduce+-+%E7%94%A8%E6%88%B7%E7%95%8C%E9%9D%A2"></a>
<h2 class="h3">Map/Reduce - 用户界面</h2>
<div class="section">
<p>这部分文档为用户将会面临的Map/Reduce框架中的各个环节提供了适当的细节。这应该会帮助用户更细粒度地去实现、配置和调优作业。然而,请注意每个类/接口的javadoc文档提供最全面的文档;本文只是想起到指南的作用。
</p>
<p>我们会先看看<span class="codefrag">Mapper</span><span class="codefrag">Reducer</span>接口。应用程序通常会通过提供<span class="codefrag">map</span><span class="codefrag">reduce</span>方法来实现它们。
</p>
<p>然后,我们会讨论其他的核心接口,其中包括:
<span class="codefrag">JobConf</span><span class="codefrag">JobClient</span><span class="codefrag">Partitioner</span>
<span class="codefrag">OutputCollector</span><span class="codefrag">Reporter</span>
<span class="codefrag">InputFormat</span><span class="codefrag">OutputFormat</span>等等。</p>
<p>最后,我们将通过讨论框架中一些有用的功能点(例如:<span class="codefrag">DistributedCache</span>
<span class="codefrag">IsolationRunner</span>等等)来收尾。</p>
<a name="N105EE"></a><a name="%E6%A0%B8%E5%BF%83%E5%8A%9F%E8%83%BD%E6%8F%8F%E8%BF%B0"></a>
<h3 class="h4">核心功能描述</h3>
<p>应用程序通常会通过提供<span class="codefrag">map</span><span class="codefrag">reduce</span>来实现
<span class="codefrag">Mapper</span><span class="codefrag">Reducer</span>接口,它们组成作业的核心。</p>
<a name="N10603"></a><a name="Mapper"></a>
<h4>Mapper</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/Mapper.html">
Mapper</a>将输入键值对(key/value pair)映射到一组中间格式的键值对集合。</p>
<p>Map是一类将输入记录集转换为中间格式记录集的独立任务。
这种转换的中间格式记录集不需要与输入记录集的类型一致。一个给定的输入键值对可以映射成0个或多个输出键值对。</p>
<p>Hadoop Map/Reduce框架为每一个<span class="codefrag">InputSplit</span>产生一个map任务,而每个<span class="codefrag">InputSplit</span>是由该作业的<span class="codefrag">InputFormat</span>产生的。</p>
<p>概括地说,对<span class="codefrag">Mapper</span>的实现者需要重写
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConfigurable.html#configure(org.apache.hadoop.mapred.JobConf)">
JobConfigurable.configure(JobConf)</a>方法,这个方法需要传递一个<span class="codefrag">JobConf</span>参数,目的是完成Mapper的初始化工作。然后,框架为这个任务的<span class="codefrag">InputSplit</span>中每个键值对调用一次
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/Mapper.html#map(K1, V1, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)">
map(WritableComparable, Writable, OutputCollector, Reporter)</a>操作。应用程序可以通过重写<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/io/Closeable.html#close()">Closeable.close()</a>方法来执行相应的清理工作。</p>
<p>输出键值对不需要与输入键值对的类型一致。一个给定的输入键值对可以映射成0个或多个输出键值对。通过调用<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/OutputCollector.html#collect(K, V)">
OutputCollector.collect(WritableComparable,Writable)</a>可以收集输出的键值对。</p>
<p>应用程序可以使用<span class="codefrag">Reporter</span>报告进度,设定应用级别的状态消息,更新<span class="codefrag">Counters</span>(计数器),或者仅是表明自己运行正常。</p>
<p>框架随后会把与一个特定key关联的所有中间过程的值(value)分成组,然后把它们传给<span class="codefrag">Reducer</span>以产出最终的结果。用户可以通过
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setOutputKeyComparatorClass(java.lang.Class)">
JobConf.setOutputKeyComparatorClass(Class)</a>来指定具体负责分组的
<span class="codefrag">Comparator</span></p>
<p>
<span class="codefrag">Mapper</span>的输出被排序后,就被划分给每个<span class="codefrag">Reducer</span>。分块的总数目和一个作业的reduce任务的数目是一样的。用户可以通过实现自定义的 <span class="codefrag">Partitioner</span>来控制哪个key被分配给哪个 <span class="codefrag">Reducer</span></p>
<p>用户可选择通过<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setCombinerClass(java.lang.Class)">
JobConf.setCombinerClass(Class)</a>指定一个<span class="codefrag">combiner</span>,它负责对中间过程的输出进行本地的聚集,这会有助于降低从<span class="codefrag">Mapper</span>
<span class="codefrag">Reducer</span>数据传输量。
</p>
<p>这些被排好序的中间过程的输出结果保存的格式是(key-len, key, value-len, value),应用程序可以通过<span class="codefrag">JobConf</span>控制对这些中间结果是否进行压缩以及怎么压缩,使用哪种<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/io/compress/CompressionCodec.html">
CompressionCodec</a>
</p>
<a name="N1067B"></a><a name="%E9%9C%80%E8%A6%81%E5%A4%9A%E5%B0%91%E4%B8%AAMap%EF%BC%9F"></a>
<h5>需要多少个Map?</h5>
<p>Map的数目通常是由输入数据的大小决定的,一般就是所有输入文件的总块(block)数。</p>
<p>Map正常的并行规模大致是每个节点(node)大约10到100个map,对于CPU
消耗较小的map任务可以设到300个左右。由于每个任务初始化需要一定的时间,因此,比较合理的情况是map执行的时间至少超过1分钟。</p>
<p>这样,如果你输入10TB的数据,每个块(block)的大小是128MB,你将需要大约82,000个map来完成任务,除非使用
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setNumMapTasks(int)">
setNumMapTasks(int)</a>(注意:这里仅仅是对框架进行了一个提示(hint),实际决定因素见<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setNumMapTasks(int)">这里</a>)将这个数值设置得更高。</p>
<a name="N10694"></a><a name="Reducer"></a>
<h4>Reducer</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/Reducer.html">
Reducer</a>将与一个key关联的一组中间数值集归约(reduce)为一个更小的数值集。</p>
<p>用户可以通过<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setNumReduceTasks(int)">
JobConf.setNumReduceTasks(int)</a>设定一个作业中reduce任务的数目。</p>
<p>概括地说,对<span class="codefrag">Reducer</span>的实现者需要重写
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConfigurable.html#configure(org.apache.hadoop.mapred.JobConf)">
JobConfigurable.configure(JobConf)</a>方法,这个方法需要传递一个<span class="codefrag">JobConf</span>参数,目的是完成Reducer的初始化工作。然后,框架为成组的输入数据中的每个<span class="codefrag">&lt;key, (list of values)&gt;</span>对调用一次
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/Reducer.html#reduce(K2, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)">
reduce(WritableComparable, Iterator, OutputCollector, Reporter)</a>方法。之后,应用程序可以通过重写<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/io/Closeable.html#close()">Closeable.close()</a>来执行相应的清理工作。</p>
<p>
<span class="codefrag">Reducer</span>有3个主要阶段:shuffle、sort和reduce。
</p>
<a name="N106C4"></a><a name="Shuffle"></a>
<h5>Shuffle</h5>
<p>
<span class="codefrag">Reducer</span>的输入就是Mapper已经排好序的输出。在这个阶段,框架通过HTTP为每个Reducer获得所有Mapper输出中与之相关的分块。</p>
<a name="N106D0"></a><a name="Sort"></a>
<h5>Sort</h5>
<p>这个阶段,框架将按照key的值对<span class="codefrag">Reducer</span>的输入进行分组
(因为不同mapper的输出中可能会有相同的key)。</p>
<p>Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取回一边被合并的。</p>
<a name="N106DF"></a><a name="Secondary+Sort"></a>
<h5>Secondary Sort</h5>
<p>如果需要中间过程对key的分组规则和reduce前对key的分组规则不同,那么可以通过<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setOutputValueGroupingComparator(java.lang.Class)">
JobConf.setOutputValueGroupingComparator(Class)</a>来指定一个<span class="codefrag">Comparator</span>。再加上
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setOutputKeyComparatorClass(java.lang.Class)">
JobConf.setOutputKeyComparatorClass(Class)</a>可用于控制中间过程的key如何被分组,所以结合两者可以实现<em>按值的二次排序</em>
</p>
<a name="N106F8"></a><a name="Reduce"></a>
<h5>Reduce</h5>
<p>在这个阶段,框架为已分组的输入数据中的每个
<span class="codefrag">&lt;key, (list of values)&gt;</span>对调用一次
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/Reducer.html#reduce(K2, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)">
reduce(WritableComparable, Iterator, OutputCollector, Reporter)</a>方法。</p>
<p>Reduce任务的输出通常是通过调用
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/OutputCollector.html#collect(K, V)">
OutputCollector.collect(WritableComparable, Writable)</a>写入
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/fs/FileSystem.html">
文件系统</a>的。</p>
<p>应用程序可以使用<span class="codefrag">Reporter</span>报告进度,设定应用程序级别的状态消息,更新<span class="codefrag">Counters</span>(计数器),或者仅是表明自己运行正常。</p>
<p>
<span class="codefrag">Reducer</span>的输出是<em>没有排序的</em></p>
<a name="N10725"></a><a name="%E9%9C%80%E8%A6%81%E5%A4%9A%E5%B0%91%E4%B8%AAReduce%EF%BC%9F"></a>
<h5>需要多少个Reduce?</h5>
<p>Reduce的数目建议是<span class="codefrag">0.95</span><span class="codefrag">1.75</span>乘以
(&lt;<em>no. of nodes</em>&gt; *
<span class="codefrag">mapred.tasktracker.reduce.tasks.maximum</span>)。
</p>
<p>用0.95,所有reduce可以在maps一完成时就立刻启动,开始传输map的输出结果。用1.75,速度快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。</p>
<p>增加reduce的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。</p>
<p>上述比例因子比整体数目稍小一些是为了给框架中的推测性任务(speculative-tasks)
或失败的任务预留一些reduce的资源。</p>
<a name="N10744"></a><a name="%E6%97%A0Reducer"></a>
<h5>无Reducer</h5>
<p>如果没有归约要进行,那么设置reduce任务的数目为<em></em>是合法的。</p>
<p>这种情况下,map任务的输出会直接被写入由
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileOutputFormat.html#setOutputPath(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path)">
setOutputPath(Path)</a>指定的输出路径。框架在把它们写入<span class="codefrag">FileSystem</span>之前没有对它们进行排序。
</p>
<a name="N1075C"></a><a name="Partitioner"></a>
<h4>Partitioner</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/Partitioner.html">
Partitioner</a>用于划分键值空间(key space)。</p>
<p>Partitioner负责控制map输出结果key的分割。Key(或者一个key子集)被用于产生分区,通常使用的是Hash函数。分区的数目与一个作业的reduce任务的数目是一样的。因此,它控制将中间过程的key(也就是这条记录)应该发送给<span class="codefrag">m</span>个reduce任务中的哪一个来进行reduce操作。
</p>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/lib/HashPartitioner.html">
HashPartitioner</a>是默认的 <span class="codefrag">Partitioner</span></p>
<a name="N10778"></a><a name="Reporter"></a>
<h4>Reporter</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/Reporter.html">
Reporter</a>是用于Map/Reduce应用程序报告进度,设定应用级别的状态消息,
更新<span class="codefrag">Counters</span>(计数器)的机制。</p>
<p>
<span class="codefrag">Mapper</span><span class="codefrag">Reducer</span>的实现可以利用<span class="codefrag">Reporter</span>
来报告进度,或者仅是表明自己运行正常。在那种应用程序需要花很长时间处理个别键值对的场景中,这种机制是很关键的,因为框架可能会以为这个任务超时了,从而将它强行杀死。另一个避免这种情况发生的方式是,将配置参数<span class="codefrag">mapred.task.timeout</span>设置为一个足够高的值(或者干脆设置为零,则没有超时限制了)。
</p>
<p>应用程序可以用<span class="codefrag">Reporter</span>来更新<span class="codefrag">Counter</span>(计数器)。
</p>
<a name="N1079F"></a><a name="OutputCollector"></a>
<h4>OutputCollector</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/OutputCollector.html">
OutputCollector</a>是一个Map/Reduce框架提供的用于收集
<span class="codefrag">Mapper</span><span class="codefrag">Reducer</span>输出数据的通用机制
(包括中间输出结果和作业的输出结果)。</p>
<p>Hadoop Map/Reduce框架附带了一个包含许多实用型的mapper、reducer和partitioner
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/lib/package-summary.html">类库</a></p>
<a name="N107BA"></a><a name="%E4%BD%9C%E4%B8%9A%E9%85%8D%E7%BD%AE"></a>
<h3 class="h4">作业配置</h3>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html">
JobConf</a>代表一个Map/Reduce作业的配置。</p>
<p>
<span class="codefrag">JobConf</span>是用户向Hadoop框架描述一个Map/Reduce作业如何执行的主要接口。框架会按照<span class="codefrag">JobConf</span>描述的信息忠实地去尝试完成这个作业,然而:</p>
<ul>
<li>
一些参数可能会被管理者标记为<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/conf/Configuration.html#FinalParams">
final</a>,这意味它们不能被更改。
</li>
<li>
一些作业的参数可以被直截了当地进行设置(例如:
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setNumReduceTasks(int)">
setNumReduceTasks(int)</a>),而另一些参数则与框架或者作业的其他参数之间微妙地相互影响,并且设置起来比较复杂(例如:<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setNumMapTasks(int)">
setNumMapTasks(int)</a>)。
</li>
</ul>
<p>通常,<span class="codefrag">JobConf</span>会指明<span class="codefrag">Mapper</span>、Combiner(如果有的话)、
<span class="codefrag">Partitioner</span><span class="codefrag">Reducer</span><span class="codefrag">InputFormat</span>
<span class="codefrag">OutputFormat</span>的具体实现。<span class="codefrag">JobConf</span>还能指定一组输入文件
(<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileInputFormat.html#setInputPaths(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path[])">setInputPaths(JobConf, Path...)</a>
/<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileInputFormat.html#addInputPath(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path)">addInputPath(JobConf, Path)</a>)
和(<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileInputFormat.html#setInputPaths(org.apache.hadoop.mapred.JobConf,%20java.lang.String)">setInputPaths(JobConf, String)</a>
/<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileInputFormat.html#addInputPath(org.apache.hadoop.mapred.JobConf,%20java.lang.String)">addInputPaths(JobConf, String)</a>)
以及输出文件应该写在哪儿
(<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileOutputFormat.html#setOutputPath(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path)">setOutputPath(Path)</a>)。</p>
<p>
<span class="codefrag">JobConf</span>可选择地对作业设置一些高级选项,例如:设置<span class="codefrag">Comparator</span>
放到<span class="codefrag">DistributedCache</span>上的文件;中间结果或者作业输出结果是否需要压缩以及怎么压缩;
利用用户提供的脚本(<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setMapDebugScript(java.lang.String)">setMapDebugScript(String)</a>/<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setReduceDebugScript(java.lang.String)">setReduceDebugScript(String)</a>)
进行调试;作业是否允许<em>预防性(speculative)</em>任务的执行
(<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setMapSpeculativeExecution(boolean)">setMapSpeculativeExecution(boolean)</a>)/(<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setReduceSpeculativeExecution(boolean)">setReduceSpeculativeExecution(boolean)</a>)
;每个任务最大的尝试次数
(<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setMaxMapAttempts(int)">setMaxMapAttempts(int)</a>/<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setMaxReduceAttempts(int)">setMaxReduceAttempts(int)</a>)
;一个作业能容忍的任务失败的百分比
(<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setMaxMapTaskFailuresPercent(int)">setMaxMapTaskFailuresPercent(int)</a>/<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setMaxReduceTaskFailuresPercent(int)">setMaxReduceTaskFailuresPercent(int)</a>)
;等等。</p>
<p>当然,用户能使用
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/conf/Configuration.html#set(java.lang.String, java.lang.String)">set(String, String)</a>/<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/conf/Configuration.html#get(java.lang.String, java.lang.String)">get(String, String)</a>
来设置或者取得应用程序需要的任意参数。然而,<span class="codefrag">DistributedCache</span>的使用是面向大规模只读数据的。</p>
<a name="N1084C"></a><a name="%E4%BB%BB%E5%8A%A1%E7%9A%84%E6%89%A7%E8%A1%8C%E5%92%8C%E7%8E%AF%E5%A2%83"></a>
<h3 class="h4">任务的执行和环境</h3>
<p>
<span class="codefrag">TaskTracker</span>是在一个单独的jvm上以子进程的形式执行
<span class="codefrag">Mapper</span>/<span class="codefrag">Reducer</span>任务(Task)的。
</p>
<p>子任务会继承父<span class="codefrag">TaskTracker</span>的环境。用户可以通过JobConf中的
<span class="codefrag">mapred.child.java.opts</span>配置参数来设定子jvm上的附加选项,例如:
通过<span class="codefrag">-Djava.library.path=&lt;&gt;</span> 将一个非标准路径设为运行时的链接用以搜索共享库,等等。如果<span class="codefrag">mapred.child.java.opts</span>包含一个符号<em>@taskid@</em>
它会被替换成map/reduce的taskid的值。</p>
<p>下面是一个包含多个参数和替换的例子,其中包括:记录jvm GC日志;
JVM JMX代理程序以无密码的方式启动,这样它就能连接到jconsole上,从而可以查看子进程的内存和线程,得到线程的dump;还把子jvm的最大堆尺寸设置为512MB,
并为子jvm的<span class="codefrag">java.library.path</span>添加了一个附加路径。</p>
<p>
<span class="codefrag">&lt;property&gt;</span>
<br>
&nbsp;&nbsp;<span class="codefrag">&lt;name&gt;mapred.child.java.opts&lt;/name&gt;</span>
<br>
&nbsp;&nbsp;<span class="codefrag">&lt;value&gt;</span>
<br>
&nbsp;&nbsp;&nbsp;&nbsp;<span class="codefrag">
-Xmx512M -Djava.library.path=/home/mycompany/lib
-verbose:gc -Xloggc:/tmp/@taskid@.gc</span>
<br>
&nbsp;&nbsp;&nbsp;&nbsp;<span class="codefrag">
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false</span>
<br>
&nbsp;&nbsp;<span class="codefrag">&lt;/value&gt;</span>
<br>
<span class="codefrag">&lt;/property&gt;</span>
</p>
<p>用户或管理员也可以使用<span class="codefrag">mapred.child.ulimit</span>设定运行的子任务的最大虚拟内存。<span class="codefrag">mapred.child.ulimit</span>的值以(KB)为单位,并且必须大于或等于-Xmx参数传给JavaVM的值,否则VM会无法启动。</p>
<p>注意:<span class="codefrag">mapred.child.java.opts</span>只用于设置task tracker启动的子任务。为守护进程设置内存选项请查看
<a href="cluster_setup.html#%E9%85%8D%E7%BD%AEHadoop%E5%AE%88%E6%8A%A4%E8%BF%9B%E7%A8%8B%E7%9A%84%E8%BF%90%E8%A1%8C%E7%8E%AF%E5%A2%83">
cluster_setup.html </a>
</p>
<p>
<span class="codefrag"> ${mapred.local.dir}/taskTracker/</span>是task tracker的本地目录,
用于创建本地缓存和job。它可以指定多个目录(跨越多个磁盘),文件会半随机的保存到本地路径下的某个目录。当job启动时,task tracker根据配置文档创建本地job目录,目录结构如以下所示:</p>
<ul>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/archive/</span> :分布式缓存。这个目录保存本地的分布式缓存。因此本地分布式缓存是在所有task和job间共享的。</li>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/</span> :
本地job目录。
<ul>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/work/</span>:
job指定的共享目录。各个任务可以使用这个空间做为暂存空间,用于它们之间共享文件。这个目录通过<span class="codefrag">job.local.dir </span>参数暴露给用户。这个路径可以通过API <a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#getJobLocalDir()">
JobConf.getJobLocalDir()</a>来访问。它也可以被做为系统属性获得。因此,用户(比如运行streaming)可以调用<span class="codefrag">System.getProperty("job.local.dir")</span>获得该目录。
</li>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/</span>:
存放jar包的路径,用于存放作业的jar文件和展开的jar。<span class="codefrag">job.jar</span>是应用程序的jar文件,它会被自动分发到各台机器,在task启动前会被自动展开。使用api
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#getJar()">
JobConf.getJar() </a>函数可以得到job.jar的位置。使用JobConf.getJar().getParent()可以访问存放展开的jar包的目录。
</li>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml</span>
一个job.xml文件,本地的通用的作业配置文件。
</li>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid</span>
每个任务有一个目录<span class="codefrag">task-id</span>,它里面有如下的目录结构:
<ul>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml</span>
一个job.xml文件,本地化的任务作业配置文件。任务本地化是指为该task设定特定的属性值。这些值会在下面具体说明。
</li>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output</span>
一个存放中间过程的输出文件的目录。它保存了由framwork产生的临时map reduce数据,比如map的输出文件等。</li>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work</span>
task的当前工作目录。</li>
<li>
<span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp</span>
task的临时目录。(用户可以设定属性<span class="codefrag">mapred.child.tmp</span>
来为map和reduce task设定临时目录。缺省值是<span class="codefrag">./tmp</span>。如果这个值不是绝对路径,
它会把task的工作路径加到该路径前面作为task的临时文件路径。如果这个值是绝对路径则直接使用这个值。
如果指定的目录不存在,会自动创建该目录。之后,按照选项
<span class="codefrag">-Djava.io.tmpdir='临时文件的绝对路径'</span>执行java子任务。
pipes和streaming的临时文件路径是通过环境变量<span class="codefrag">TMPDIR='the absolute path of the tmp dir'</span>设定的)。
如果<span class="codefrag">mapred.child.tmp</span><span class="codefrag">./tmp</span>值,这个目录会被创建。</li>
</ul>
</li>
</ul>
</li>
</ul>
<p>下面的属性是为每个task执行时使用的本地参数,它们保存在本地化的任务作业配置文件里:</p>
<table class="ForrestTable" cellspacing="1" cellpadding="4">
<tr>
<th colspan="1" rowspan="1">名称</th><th colspan="1" rowspan="1">类型</th><th colspan="1" rowspan="1">描述</th>
</tr>
<tr>
<td colspan="1" rowspan="1">mapred.job.id</td><td colspan="1" rowspan="1">String</td><td colspan="1" rowspan="1">job id</td>
</tr>
<tr>
<td colspan="1" rowspan="1">mapred.jar</td><td colspan="1" rowspan="1">String</td>
<td colspan="1" rowspan="1">job目录下job.jar的位置</td>
</tr>
<tr>
<td colspan="1" rowspan="1">job.local.dir</td><td colspan="1" rowspan="1"> String</td>
<td colspan="1" rowspan="1">job指定的共享存储空间</td>
</tr>
<tr>
<td colspan="1" rowspan="1">mapred.tip.id</td><td colspan="1" rowspan="1"> String</td>
<td colspan="1" rowspan="1"> task id</td>
</tr>
<tr>
<td colspan="1" rowspan="1">mapred.task.id</td><td colspan="1" rowspan="1"> String</td>
<td colspan="1" rowspan="1"> task尝试id</td>
</tr>
<tr>
<td colspan="1" rowspan="1">mapred.task.is.map</td><td colspan="1" rowspan="1"> boolean </td>
<td colspan="1" rowspan="1">是否是map task</td>
</tr>
<tr>
<td colspan="1" rowspan="1">mapred.task.partition</td><td colspan="1" rowspan="1"> int </td>
<td colspan="1" rowspan="1">task在job中的id</td>
</tr>
<tr>
<td colspan="1" rowspan="1">map.input.file</td><td colspan="1" rowspan="1"> String</td>
<td colspan="1" rowspan="1"> map读取的文件名</td>
</tr>
<tr>
<td colspan="1" rowspan="1">map.input.start</td><td colspan="1" rowspan="1"> long</td>
<td colspan="1" rowspan="1"> map输入的数据块的起始位置偏移</td>
</tr>
<tr>
<td colspan="1" rowspan="1">map.input.length </td><td colspan="1" rowspan="1">long </td>
<td colspan="1" rowspan="1">map输入的数据块的字节数</td>
</tr>
<tr>
<td colspan="1" rowspan="1">mapred.work.output.dir</td><td colspan="1" rowspan="1"> String </td>
<td colspan="1" rowspan="1">task临时输出目录</td>
</tr>
</table>
<p>task的标准输出和错误输出流会被读到TaskTracker中,并且记录到
<span class="codefrag">${HADOOP_LOG_DIR}/userlogs</span>
</p>
<p>
<a href="#DistributedCache">DistributedCache</a>
可用于map或reduce task中分发jar包和本地库。子jvm总是把
<em>当前工作目录</em> 加到
<span class="codefrag">java.library.path</span><span class="codefrag">LD_LIBRARY_PATH</span>
因此,可以通过
<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/lang/System.html#loadLibrary(java.lang.String)">
System.loadLibrary</a>
<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/lang/System.html#load(java.lang.String)">
System.load</a>装载缓存的库。有关使用分布式缓存加载共享库的细节请参考
<a href="native_libraries.html#%E4%BD%BF%E7%94%A8DistributedCache+%E5%8A%A0%E8%BD%BD%E6%9C%AC%E5%9C%B0%E5%BA%93">
native_libraries.html</a>
</p>
<a name="N109E3"></a><a name="%E4%BD%9C%E4%B8%9A%E7%9A%84%E6%8F%90%E4%BA%A4%E4%B8%8E%E7%9B%91%E6%8E%A7"></a>
<h3 class="h4">作业的提交与监控</h3>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobClient.html">
JobClient</a>是用户提交的作业与<span class="codefrag">JobTracker</span>交互的主要接口。
</p>
<p>
<span class="codefrag">JobClient</span> 提供提交作业,追踪进程,访问子任务的日志记录,获得Map/Reduce集群状态信息等功能。
</p>
<p>作业提交过程包括: </p>
<ol>
<li>检查作业输入输出样式细节</li>
<li>为作业计算<span class="codefrag">InputSplit</span>值。</li>
<li>
如果需要的话,为作业的<span class="codefrag">DistributedCache</span>建立必须的统计信息。
</li>
<li>
拷贝作业的jar包和配置文件到<span class="codefrag">FileSystem</span>上的Map/Reduce系统目录下。
</li>
<li>
提交作业到<span class="codefrag">JobTracker</span>并且监控它的状态。
</li>
</ol>
<p>作业的历史文件记录到指定目录的"_logs/history/"子目录下。这个指定目录由<span class="codefrag">hadoop.job.history.user.location</span>设定,默认是作业输出的目录。因此默认情况下,文件会存放在mapred.output.dir/_logs/history目录下。用户可以设置<span class="codefrag">hadoop.job.history.user.location</span><span class="codefrag">none</span>来停止日志记录。
</p>
<p> 用户使用下面的命令可以看到在指定目录下的历史日志记录的摘要。
<br>
<span class="codefrag">$ bin/hadoop job -history output-dir</span>
<br>
这个命令会打印出作业的细节,以及失败的和被杀死的任务细节。<br>
要查看有关作业的更多细节例如成功的任务、每个任务尝试的次数(task attempt)等,可以使用下面的命令
<br>
<span class="codefrag">$ bin/hadoop job -history all output-dir</span>
<br>
</p>
<p>用户可以使用
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/OutputLogFilter.html">OutputLogFilter</a>
从输出目录列表中筛选日志文件。</p>
<p>一般情况,用户利用<span class="codefrag">JobConf</span>创建应用程序并配置作业属性,
然后用
<span class="codefrag">JobClient</span> 提交作业并监视它的进程。</p>
<a name="N10A44"></a><a name="%E4%BD%9C%E4%B8%9A%E7%9A%84%E6%8E%A7%E5%88%B6"></a>
<h4>作业的控制</h4>
<p>有时候,用一个单独的Map/Reduce作业并不能完成一个复杂的任务,用户也许要链接多个Map/Reduce作业才行。这是容易实现的,因为作业通常输出到分布式文件系统上的,所以可以把这个作业的输出作为下一个作业的输入实现串联。
</p>
<p>然而,这也意味着,确保每一作业完成(成功或失败)的责任就直接落在了客户身上。在这种情况下,可以用的控制作业的选项有:
</p>
<ul>
<li>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobClient.html#runJob(org.apache.hadoop.mapred.JobConf)">
runJob(JobConf)</a>:提交作业,仅当作业完成时返回。
</li>
<li>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobClient.html#submitJob(org.apache.hadoop.mapred.JobConf)">
submitJob(JobConf)</a>:只提交作业,之后需要你轮询它返回的
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/RunningJob.html">
RunningJob</a>句柄的状态,并根据情况调度。
</li>
<li>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setJobEndNotificationURI(java.lang.String)">
JobConf.setJobEndNotificationURI(String)</a>:设置一个作业完成通知,可避免轮询。
</li>
</ul>
<a name="N10A6E"></a><a name="%E4%BD%9C%E4%B8%9A%E7%9A%84%E8%BE%93%E5%85%A5"></a>
<h3 class="h4">作业的输入</h3>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/InputFormat.html">
InputFormat</a> 为Map/Reduce作业描述输入的细节规范。
</p>
<p>Map/Reduce框架根据作业的<span class="codefrag">InputFormat</span>来:
</p>
<ol>
<li>检查作业输入的有效性。</li>
<li>
把输入文件切分成多个逻辑<span class="codefrag">InputSplit</span>实例,
并把每一实例分别分发给一个
<span class="codefrag">Mapper</span>
</li>
<li>
提供<span class="codefrag">RecordReader</span>的实现,这个RecordReader从逻辑<span class="codefrag">InputSplit</span>中获得输入记录,
这些记录将由<span class="codefrag">Mapper</span>处理。
</li>
</ol>
<p>基于文件的<span class="codefrag">InputFormat</span>实现(通常是
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileInputFormat.html">
FileInputFormat</a>的子类)
默认行为是按照输入文件的字节大小,把输入数据切分成逻辑分块(<em>logical</em>
<span class="codefrag">InputSplit</span> )。
其中输入文件所在的<span class="codefrag">FileSystem</span>的数据块尺寸是分块大小的上限。下限可以设置<span class="codefrag">mapred.min.split.size</span>
的值。</p>
<p>考虑到边界情况,对于很多应用程序来说,很明显按照文件大小进行逻辑分割是不能满足需求的。
在这种情况下,应用程序需要实现一个<span class="codefrag">RecordReader</span>来处理记录的边界并为每个任务提供一个逻辑分块的面向记录的视图。
</p>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/TextInputFormat.html">
TextInputFormat</a> 是默认的<span class="codefrag">InputFormat</span></p>
<p>如果一个作业的<span class="codefrag">Inputformat</span><span class="codefrag">TextInputFormat</span>
并且框架检测到输入文件的后缀是<em>.gz</em><em>.lzo</em>,就会使用对应的<span class="codefrag">CompressionCodec</span>自动解压缩这些文件。
但是需要注意,上述带后缀的压缩文件不会被切分,并且整个压缩文件会分给一个mapper来处理。
</p>
<a name="N10AD2"></a><a name="InputSplit"></a>
<h4>InputSplit</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/InputSplit.html">
InputSplit</a> 是一个单独的<span class="codefrag">Mapper</span>要处理的数据块。</p>
<p>一般的<span class="codefrag">InputSplit</span> 是字节样式输入,然后由<span class="codefrag">RecordReader</span>处理并转化成记录样式。
</p>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileSplit.html">
FileSplit</a> 是默认的<span class="codefrag">InputSplit</span>。 它把
<span class="codefrag">map.input.file</span> 设定为输入文件的路径,输入文件是逻辑分块文件。
</p>
<a name="N10AF7"></a><a name="RecordReader"></a>
<h4>RecordReader</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/RecordReader.html">
RecordReader</a><span class="codefrag">InputSlit</span>读入<span class="codefrag">&lt;key, value&gt;</span>对。
</p>
<p>一般的,<span class="codefrag">RecordReader</span> 把由<span class="codefrag">InputSplit</span>
提供的字节样式的输入文件,转化成由<span class="codefrag">Mapper</span>处理的记录样式的文件。
因此<span class="codefrag">RecordReader</span>负责处理记录的边界情况和把数据表示成keys/values对形式。
</p>
<a name="N10B1A"></a><a name="%E4%BD%9C%E4%B8%9A%E7%9A%84%E8%BE%93%E5%87%BA"></a>
<h3 class="h4">作业的输出</h3>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/OutputFormat.html">
OutputFormat</a> 描述Map/Reduce作业的输出样式。
</p>
<p>Map/Reduce框架根据作业的<span class="codefrag">OutputFormat</span>来:
</p>
<ol>
<li>
检验作业的输出,例如检查输出路径是否已经存在。
</li>
<li>
提供一个<span class="codefrag">RecordWriter</span>的实现,用来输出作业结果。
输出文件保存在<span class="codefrag">FileSystem</span>上。
</li>
</ol>
<p>
<span class="codefrag">TextOutputFormat</span>是默认的
<span class="codefrag">OutputFormat</span></p>
<a name="N10B43"></a><a name="%E4%BB%BB%E5%8A%A1%E7%9A%84Side-Effect+File"></a>
<h4>任务的Side-Effect File</h4>
<p>在一些应用程序中,子任务需要产生一些side-file,这些文件与作业实际输出结果的文件不同。
</p>
<p>在这种情况下,同一个<span class="codefrag">Mapper</span>或者<span class="codefrag">Reducer</span>的两个实例(比如预防性任务)同时打开或者写
<span class="codefrag">FileSystem</span>上的同一文件就会产生冲突。因此应用程序在写文件的时候需要为每次任务尝试(不仅仅是每次任务,每个任务可以尝试执行很多次)选取一个独一无二的文件名(使用attemptid,例如<span class="codefrag">task_200709221812_0001_m_000000_0</span>)。
</p>
<p>为了避免冲突,Map/Reduce框架为每次尝试执行任务都建立和维护一个特殊的
<span class="codefrag">${mapred.output.dir}/_temporary/_${taskid}</span>子目录,这个目录位于本次尝试执行任务输出结果所在的<span class="codefrag">FileSystem</span>上,可以通过
<span class="codefrag">${mapred.work.output.dir}</span>来访问这个子目录。
对于成功完成的任务尝试,只有<span class="codefrag">${mapred.output.dir}/_temporary/_${taskid}</span>下的文件会<em>移动</em><span class="codefrag">${mapred.output.dir}</span>。当然,框架会丢弃那些失败的任务尝试的子目录。这种处理过程对于应用程序来说是完全透明的。</p>
<p>在任务执行期间,应用程序在写文件时可以利用这个特性,比如
通过<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileOutputFormat.html#getWorkOutputPath(org.apache.hadoop.mapred.JobConf)">
FileOutputFormat.getWorkOutputPath()</a>获得<span class="codefrag">${mapred.work.output.dir}</span>目录,
并在其下创建任意任务执行时所需的side-file,框架在任务尝试成功时会马上移动这些文件,因此不需要在程序内为每次任务尝试选取一个独一无二的名字。
</p>
<p>注意:在每次任务尝试执行期间,<span class="codefrag">${mapred.work.output.dir}</span> 的值实际上是
<span class="codefrag">${mapred.output.dir}/_temporary/_{$taskid}</span>,这个值是Map/Reduce框架创建的。
所以使用这个特性的方法是,在<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileOutputFormat.html#getWorkOutputPath(org.apache.hadoop.mapred.JobConf)">
FileOutputFormat.getWorkOutputPath() </a>
路径下创建side-file即可。
</p>
<p>对于只使用map不使用reduce的作业,这个结论也成立。这种情况下,map的输出结果直接生成到HDFS上。
</p>
<a name="N10B8B"></a><a name="RecordWriter"></a>
<h4>RecordWriter</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/RecordWriter.html">
RecordWriter</a> 生成<span class="codefrag">&lt;key, value&gt;</span>
对到输出文件。</p>
<p>RecordWriter的实现把作业的输出结果写到
<span class="codefrag">FileSystem</span></p>
<a name="N10BA2"></a><a name="%E5%85%B6%E4%BB%96%E6%9C%89%E7%94%A8%E7%9A%84%E7%89%B9%E6%80%A7"></a>
<h3 class="h4">其他有用的特性</h3>
<a name="N10BA8"></a><a name="Counters"></a>
<h4>Counters</h4>
<p>
<span class="codefrag">Counters</span> 是多个由Map/Reduce框架或者应用程序定义的全局计数器。
每一个<span class="codefrag">Counter</span>可以是任何一种
<span class="codefrag">Enum</span>类型。同一特定<span class="codefrag">Enum</span>类型的Counter可以汇集到一个组,其类型为<span class="codefrag">Counters.Group</span></p>
<p>应用程序可以定义任意(Enum类型)的<span class="codefrag">Counters</span>并且可以通过 <span class="codefrag">map</span> 或者
<span class="codefrag">reduce</span>方法中的
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/Reporter.html#incrCounter(java.lang.Enum, long)">
Reporter.incrCounter(Enum, long)</a>或者
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/Reporter.html#incrCounter(java.lang.String, java.lang.String, long amount)">
Reporter.incrCounter(String, String, long)</a>
更新。之后框架会汇总这些全局counters。
</p>
<a name="N10BD4"></a><a name="DistributedCache"></a>
<h4>DistributedCache</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/filecache/DistributedCache.html">
DistributedCache</a> 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。
</p>
<p>
<span class="codefrag">DistributedCache</span> 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件
(包括文本,档案文件,jar文件等)。
</p>
<p>应用程序在<span class="codefrag">JobConf</span>中通过url(hdfs://)指定需要被缓存的文件。
<span class="codefrag">DistributedCache</span>假定由hdfs://格式url指定的文件已经在
<span class="codefrag">FileSystem</span>上了。</p>
<p>Map-Redcue框架在作业所有任务执行之前会把必要的文件拷贝到slave节点上。
它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。
</p>
<p>
<span class="codefrag">DistributedCache</span> 根据缓存文档修改的时间戳进行追踪。
在作业执行期间,当前应用程序或者外部程序不能修改缓存文件。
</p>
<p>
<span class="codefrag">distributedCache</span>可以分发简单的只读数据或文本文件,也可以分发复杂类型的文件例如归档文件和jar文件。归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被<em>解档(un-archived)</em>
这些文件可以设置<em>执行权限</em></p>
<p>用户可以通过设置<span class="codefrag">mapred.cache.{files|archives}</span>来分发文件。
如果要分发多个文件,可以使用逗号分隔文件所在路径。也可以利用API来设置该属性:
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/filecache/DistributedCache.html#addCacheFile(java.net.URI,%20org.apache.hadoop.conf.Configuration)">
DistributedCache.addCacheFile(URI,conf)</a>/
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/filecache/DistributedCache.html#addCacheArchive(java.net.URI,%20org.apache.hadoop.conf.Configuration)">
DistributedCache.addCacheArchive(URI,conf)</a> and
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/filecache/DistributedCache.html#setCacheFiles(java.net.URI[],%20org.apache.hadoop.conf.Configuration)">
DistributedCache.setCacheFiles(URIs,conf)</a>/
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/filecache/DistributedCache.html#setCacheArchives(java.net.URI[],%20org.apache.hadoop.conf.Configuration)">
DistributedCache.setCacheArchives(URIs,conf)</a>
其中URI的形式是
<span class="codefrag">hdfs://host:port/absolute-path#link-name</span>
在Streaming程序中,可以通过命令行选项
<span class="codefrag">-cacheFile/-cacheArchive</span>
分发文件。</p>
<p>
用户可以通过<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/filecache/DistributedCache.html#createSymlink(org.apache.hadoop.conf.Configuration)">
DistributedCache.createSymlink(Configuration)</a>方法让<span class="codefrag">DistributedCache</span>
<em>当前工作目录</em>下创建到缓存文件的符号链接。
或者通过设置配置文件属性<span class="codefrag">mapred.create.symlink</span><span class="codefrag">yes</span>
分布式缓存会截取URI的片段作为链接的名字。
例如,URI是 <span class="codefrag">hdfs://namenode:port/lib.so.1#lib.so</span>
则在task当前工作目录会有名为<span class="codefrag">lib.so</span>的链接,
它会链接分布式缓存中的<span class="codefrag">lib.so.1</span>
</p>
<p>
<span class="codefrag">DistributedCache</span>可在map/reduce任务中作为
一种基础软件分发机制使用。它可以被用于分发jar包和本地库(native libraries)。
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/filecache/DistributedCache.html#addArchiveToClassPath(org.apache.hadoop.fs.Path,%20org.apache.hadoop.conf.Configuration)">
DistributedCache.addArchiveToClassPath(Path, Configuration)</a>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/filecache/DistributedCache.html#addFileToClassPath(org.apache.hadoop.fs.Path,%20org.apache.hadoop.conf.Configuration)">
DistributedCache.addFileToClassPath(Path, Configuration)</a> API能够被用于
缓存文件和jar包,并把它们加入子jvm的<em>classpath</em>。也可以通过设置配置文档里的属性
<span class="codefrag">mapred.job.classpath.{files|archives}</span>达到相同的效果。缓存文件可用于分发和装载本地库。
</p>
<a name="N10C50"></a><a name="Tool"></a>
<h4>Tool</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/util/Tool.html">Tool</a>
接口支持处理常用的Hadoop命令行选项。
</p>
<p>
<span class="codefrag">Tool</span> 是Map/Reduce工具或应用的标准。应用程序应只处理其定制参数,
要把标准命令行选项通过
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/util/ToolRunner.html#run(org.apache.hadoop.util.Tool, java.lang.String[])"> ToolRunner.run(Tool, String[])</a>
委托给
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/util/GenericOptionsParser.html">
GenericOptionsParser</a>处理。
</p>
<p>
Hadoop命令行的常用选项有:<br>
<span class="codefrag">
-conf &lt;configuration file&gt;
</span>
<br>
<span class="codefrag">
-D &lt;property=value&gt;
</span>
<br>
<span class="codefrag">
-fs &lt;local|namenode:port&gt;
</span>
<br>
<span class="codefrag">
-jt &lt;local|jobtracker:port&gt;
</span>
</p>
<a name="N10C81"></a><a name="IsolationRunner"></a>
<h4>IsolationRunner</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/IsolationRunner.html">
IsolationRunner</a> 是帮助调试Map/Reduce程序的工具。</p>
<p>使用<span class="codefrag">IsolationRunner</span>的方法是,首先设置
<span class="codefrag">keep.failed.tasks.files</span>属性为<span class="codefrag">true</span>
(同时参考<span class="codefrag">keep.tasks.files.pattern</span>)。</p>
<p>
然后,登录到任务运行失败的节点上,进入
<span class="codefrag">TaskTracker</span>的本地路径运行
<span class="codefrag">IsolationRunner</span><br>
<span class="codefrag">$ cd &lt;local path&gt;/taskTracker/${taskid}/work</span>
<br>
<span class="codefrag">
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
</span>
</p>
<p>
<span class="codefrag">IsolationRunner</span>会把失败的任务放在单独的一个能够调试的jvm上运行,并且采用和之前完全一样的输入数据。
</p>
<a name="N10CB4"></a><a name="Profiling"></a>
<h4>Profiling</h4>
<p>Profiling是一个工具,它使用内置的java profiler工具进行分析获得(2-3个)map或reduce样例运行分析报告。</p>
<p>用户可以通过设置属性<span class="codefrag">mapred.task.profile</span>指定系统是否采集profiler信息。
利用api<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setProfileEnabled(boolean)">
JobConf.setProfileEnabled(boolean)可以修改属性值</a>。如果设为<span class="codefrag">true</span>
则开启profiling功能。profiler信息保存在用户日志目录下。缺省情况,profiling功能是关闭的。</p>
<p>如果用户设定使用profiling功能,可以使用配置文档里的属性
<span class="codefrag">mapred.task.profile.{maps|reduces}</span>
设置要profile map/reduce task的范围。设置该属性值的api是
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setProfileTaskRange(boolean,%20java.lang.String)">
JobConf.setProfileTaskRange(boolean,String)</a>
范围的缺省值是<span class="codefrag">0-2</span></p>
<p>用户可以通过设定配置文档里的属性<span class="codefrag">mapred.task.profile.params</span>
来指定profiler配置参数。修改属性要使用api
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setProfileParams(java.lang.String)">
JobConf.setProfileParams(String)</a>。当运行task时,如果字符串包含<span class="codefrag">%s</span>
它会被替换成profileing的输出文件名。这些参数会在命令行里传递到子JVM中。缺省的profiling
参数是
<span class="codefrag">-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s</span>
</p>
<a name="N10CE8"></a><a name="%E8%B0%83%E8%AF%95"></a>
<h4>调试</h4>
<p>Map/Reduce框架能够运行用户提供的用于调试的脚本程序。
当map/reduce任务失败时,用户可以通过运行脚本在任务日志(例如任务的标准输出、标准错误、系统日志以及作业配置文件)上做后续处理工作。用户提供的调试脚本程序的标准输出和标准错误会输出为诊断文件。如果需要的话这些输出结果也可以打印在用户界面上。</p>
<p> 在接下来的章节,我们讨论如何与作业一起提交调试脚本。为了提交调试脚本,
首先要把这个脚本分发出去,而且还要在配置文件里设置。
</p>
<a name="N10CF4"></a><a name="%E5%A6%82%E4%BD%95%E5%88%86%E5%8F%91%E8%84%9A%E6%9C%AC%E6%96%87%E4%BB%B6%EF%BC%9A"></a>
<h5> 如何分发脚本文件:</h5>
<p>用户要用
<a href="mapred_tutorial.html#DistributedCache">DistributedCache</a>
机制来<em>分发</em><em>链接</em>脚本文件</p>
<a name="N10D08"></a><a name="%E5%A6%82%E4%BD%95%E6%8F%90%E4%BA%A4%E8%84%9A%E6%9C%AC%EF%BC%9A"></a>
<h5> 如何提交脚本:</h5>
<p> 一个快速提交调试脚本的方法是分别为需要调试的map任务和reduce任务设置
"mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script"
属性的值。这些属性也可以通过
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setMapDebugScript(java.lang.String)">
JobConf.setMapDebugScript(String) </a>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setReduceDebugScript(java.lang.String)">
JobConf.setReduceDebugScript(String) </a>API来设置。对于streaming,
可以分别为需要调试的map任务和reduce任务使用命令行选项-mapdebug 和 -reducedegug来提交调试脚本。
</p>
<p>脚本的参数是任务的标准输出、标准错误、系统日志以及作业配置文件。在运行map/reduce失败的节点上运行调试命令是:
<br>
<span class="codefrag"> $script $stdout $stderr $syslog $jobconf </span>
</p>
<p> Pipes 程序根据第五个参数获得c++程序名。
因此调试pipes程序的命令是<br>
<span class="codefrag">$script $stdout $stderr $syslog $jobconf $program </span>
</p>
<a name="N10D2A"></a><a name="%E9%BB%98%E8%AE%A4%E8%A1%8C%E4%B8%BA"></a>
<h5> 默认行为 </h5>
<p> 对于pipes,默认的脚本会用gdb处理core dump,
打印 stack trace并且给出正在运行线程的信息。</p>
<a name="N10D35"></a><a name="JobControl"></a>
<h4>JobControl</h4>
<p>
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/jobcontrol/package-summary.html">
JobControl</a>是一个工具,它封装了一组Map/Reduce作业以及他们之间的依赖关系。
</p>
<a name="N10D42"></a><a name="%E6%95%B0%E6%8D%AE%E5%8E%8B%E7%BC%A9"></a>
<h4>数据压缩</h4>
<p>Hadoop Map/Reduce框架为应用程序的写入文件操作提供压缩工具,这些工具可以为map输出的中间数据和作业最终输出数据(例如reduce的输出)提供支持。它还附带了一些
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/io/compress/CompressionCodec.html">
CompressionCodec</a>的实现,比如实现了
<a href="http://www.zlib.net/">zlib</a><a href="http://www.oberhumer.com/opensource/lzo/">lzo</a>压缩算法。
Hadoop同样支持<a href="http://www.gzip.org/">gzip</a>文件格式。
</p>
<p>考虑到性能问题(zlib)以及Java类库的缺失(lzo)等因素,Hadoop也为上述压缩解压算法提供本地库的实现。更多的细节请参考
<a href="native_libraries.html">这里</a></p>
<a name="N10D62"></a><a name="%E4%B8%AD%E9%97%B4%E8%BE%93%E5%87%BA"></a>
<h5>中间输出</h5>
<p>应用程序可以通过
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setCompressMapOutput(boolean)">
JobConf.setCompressMapOutput(boolean)</a>api控制map输出的中间结果,并且可以通过
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/JobConf.html#setMapOutputCompressorClass(java.lang.Class)">
JobConf.setMapOutputCompressorClass(Class)</a>api指定
<span class="codefrag">CompressionCodec</span>
</p>
<a name="N10D77"></a><a name="%E4%BD%9C%E4%B8%9A%E8%BE%93%E5%87%BA"></a>
<h5>作业输出</h5>
<p>应用程序可以通过
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileOutputFormat.html#setCompressOutput(org.apache.hadoop.mapred.JobConf,%20boolean)">
FileOutputFormat.setCompressOutput(JobConf, boolean)</a>
api控制输出是否需要压缩并且可以使用
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/FileOutputFormat.html#setOutputCompressorClass(org.apache.hadoop.mapred.JobConf,%20java.lang.Class)">
FileOutputFormat.setOutputCompressorClass(JobConf, Class)</a>api指定<span class="codefrag">CompressionCodec</span></p>
<p>如果作业输出要保存成
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/SequenceFileOutputFormat.html">
SequenceFileOutputFormat</a>格式,需要使用
<a href="http://hadoop.apache.org/core/docs/r0.18.2/api/org/apache/hadoop/mapred/SequenceFileOutputFormat.html#setOutputCompressionType(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.io.SequenceFile.CompressionType)">
SequenceFileOutputFormat.setOutputCompressionType(JobConf,
SequenceFile.CompressionType)</a>api,来设定
<span class="codefrag">SequenceFile.CompressionType</span> (i.e. <span class="codefrag">RECORD</span> /
<span class="codefrag">BLOCK</span> - 默认是<span class="codefrag">RECORD</span>)。
</p>
</div>
<a name="N10DA6"></a><a name="%E4%BE%8B%E5%AD%90%EF%BC%9AWordCount+v2.0"></a>
<h2 class="h3">例子:WordCount v2.0</h2>
<div class="section">
<p>这里是一个更全面的<span class="codefrag">WordCount</span>例子,它使用了我们已经讨论过的很多Map/Reduce框架提供的功能。
</p>
<p>运行这个例子需要HDFS的某些功能,特别是
<span class="codefrag">DistributedCache</span>相关功能。因此这个例子只能运行在
<a href="quickstart.html#SingleNodeSetup">伪分布式</a> 或者
<a href="quickstart.html#Fully-Distributed+Operation">完全分布式模式</a>
Hadoop上。</p>
<a name="N10DC0"></a><a name="%E6%BA%90%E4%BB%A3%E7%A0%81-N10DC0"></a>
<h3 class="h4">源代码</h3>
<table class="ForrestTable" cellspacing="1" cellpadding="4">
<tr>
<th colspan="1" rowspan="1"></th>
<th colspan="1" rowspan="1">WordCount.java</th>
</tr>
<tr>
<td colspan="1" rowspan="1">1.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">package org.myorg;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">2.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">3.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import java.io.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">4.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import java.util.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">5.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">6.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.fs.Path;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">7.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.filecache.DistributedCache;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">8.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.conf.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">9.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.io.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">10.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.mapred.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">11.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">import org.apache.hadoop.util.*;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">12.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">13.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">public class WordCount extends Configured implements Tool {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">14.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">15.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static class Map extends MapReduceBase
implements Mapper&lt;LongWritable, Text, Text, IntWritable&gt; {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">16.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">17.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
static enum Counters { INPUT_WORDS }
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">18.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">19.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
private final static IntWritable one = new IntWritable(1);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">20.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private Text word = new Text();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">21.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">22.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private boolean caseSensitive = true;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">23.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private Set&lt;String&gt; patternsToSkip = new HashSet&lt;String&gt;();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">24.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">25.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private long numRecords = 0;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">26.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private String inputFile;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">27.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">28.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">public void configure(JobConf job) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">29.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">30.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">inputFile = job.get("map.input.file");</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">31.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">32.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">if (job.getBoolean("wordcount.skip.patterns", false)) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">33.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">Path[] patternsFiles = new Path[0];</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">34.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">try {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">35.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
patternsFiles = DistributedCache.getLocalCacheFiles(job);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">36.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">} catch (IOException ioe) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">37.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
System.err.println("Caught exception while getting cached files: "
+ StringUtils.stringifyException(ioe));
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">38.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">39.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">for (Path patternsFile : patternsFiles) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">40.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">parseSkipFile(patternsFile);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">41.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">42.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">43.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">44.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">45.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">private void parseSkipFile(Path patternsFile) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">46.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">try {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">47.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
BufferedReader fis =
new BufferedReader(new FileReader(patternsFile.toString()));
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">48.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">String pattern = null;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">49.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while ((pattern = fis.readLine()) != null) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">50.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">patternsToSkip.add(pattern);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">51.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">52.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">} catch (IOException ioe) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">53.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
System.err.println("Caught exception while parsing the cached file '" +
patternsFile + "' : " +
StringUtils.stringifyException(ioe));
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">54.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">55.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">56.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">57.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
public void map(LongWritable key, Text value,
OutputCollector&lt;Text, IntWritable&gt; output,
Reporter reporter) throws IOException {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">58.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
String line =
(caseSensitive) ? value.toString() :
value.toString().toLowerCase();
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">59.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">60.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">for (String pattern : patternsToSkip) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">61.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">line = line.replaceAll(pattern, "");</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">62.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">63.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">64.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">StringTokenizer tokenizer = new StringTokenizer(line);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">65.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while (tokenizer.hasMoreTokens()) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">66.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">word.set(tokenizer.nextToken());</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">67.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">output.collect(word, one);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">68.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">reporter.incrCounter(Counters.INPUT_WORDS, 1);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">69.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">70.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">71.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">if ((++numRecords % 100) == 0) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">72.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
reporter.setStatus("Finished processing " + numRecords +
" records " + "from the input file: " +
inputFile);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">73.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">74.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">75.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">76.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">77.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static class Reduce extends MapReduceBase implements
Reducer&lt;Text, IntWritable, Text, IntWritable&gt; {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">78.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
public void reduce(Text key, Iterator&lt;IntWritable&gt; values,
OutputCollector&lt;Text, IntWritable&gt; output,
Reporter reporter) throws IOException {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">79.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">int sum = 0;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">80.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">while (values.hasNext()) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">81.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">sum += values.next().get();</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">82.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">83.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">output.collect(key, new IntWritable(sum));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">84.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">85.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">86.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">87.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">public int run(String[] args) throws Exception {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">88.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
JobConf conf = new JobConf(getConf(), WordCount.class);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">89.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setJobName("wordcount");</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">90.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">91.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputKeyClass(Text.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">92.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputValueClass(IntWritable.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">93.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">94.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setMapperClass(Map.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">95.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setCombinerClass(Reduce.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">96.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setReducerClass(Reduce.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">97.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">98.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setInputFormat(TextInputFormat.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">99.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">conf.setOutputFormat(TextOutputFormat.class);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">100.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">101.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
List&lt;String&gt; other_args = new ArrayList&lt;String&gt;();
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">102.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">for (int i=0; i &lt; args.length; ++i) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">103.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">if ("-skip".equals(args[i])) {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">104.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">105.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
conf.setBoolean("wordcount.skip.patterns", true);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">106.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">} else {</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">107.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">other_args.add(args[i]);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">108.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">109.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">110.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">111.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">112.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">113.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">114.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">JobClient.runJob(conf);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">115.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">return 0;</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">116.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">117.</td>
<td colspan="1" rowspan="1"></td>
</tr>
<tr>
<td colspan="1" rowspan="1">118.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">
public static void main(String[] args) throws Exception {
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">119.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">
int res = ToolRunner.run(new Configuration(), new WordCount(),
args);
</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">120.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;&nbsp;&nbsp;
<span class="codefrag">System.exit(res);</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">121.</td>
<td colspan="1" rowspan="1">
&nbsp;&nbsp;
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">122.</td>
<td colspan="1" rowspan="1">
<span class="codefrag">}</span>
</td>
</tr>
<tr>
<td colspan="1" rowspan="1">123.</td>
<td colspan="1" rowspan="1"></td>
</tr>
</table>
<a name="N11522"></a><a name="%E8%BF%90%E8%A1%8C%E6%A0%B7%E4%BE%8B"></a>
<h3 class="h4">运行样例</h3>
<p>输入样例:</p>
<p>
<span class="codefrag">$ bin/hadoop dfs -ls /usr/joe/wordcount/input/</span>
<br>
<span class="codefrag">/usr/joe/wordcount/input/file01</span>
<br>
<span class="codefrag">/usr/joe/wordcount/input/file02</span>
<br>
<br>
<span class="codefrag">$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01</span>
<br>
<span class="codefrag">Hello World, Bye World!</span>
<br>
<br>
<span class="codefrag">$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02</span>
<br>
<span class="codefrag">Hello Hadoop, Goodbye to hadoop.</span>
</p>
<p>运行程序:</p>
<p>
<span class="codefrag">
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount
/usr/joe/wordcount/input /usr/joe/wordcount/output
</span>
</p>
<p>输出:</p>
<p>
<span class="codefrag">
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
</span>
<br>
<span class="codefrag">Bye 1</span>
<br>
<span class="codefrag">Goodbye 1</span>
<br>
<span class="codefrag">Hadoop, 1</span>
<br>
<span class="codefrag">Hello 2</span>
<br>
<span class="codefrag">World! 1</span>
<br>
<span class="codefrag">World, 1</span>
<br>
<span class="codefrag">hadoop. 1</span>
<br>
<span class="codefrag">to 1</span>
<br>
</p>
<p>注意此时的输入与第一个版本的不同,输出的结果也有不同。
</p>
<p>现在通过<span class="codefrag">DistributedCache</span>插入一个模式文件,文件中保存了要被忽略的单词模式。
</p>
<p>
<span class="codefrag">$ hadoop dfs -cat /user/joe/wordcount/patterns.txt</span>
<br>
<span class="codefrag">\.</span>
<br>
<span class="codefrag">\,</span>
<br>
<span class="codefrag">\!</span>
<br>
<span class="codefrag">to</span>
<br>
</p>
<p>再运行一次,这次使用更多的选项:</p>
<p>
<span class="codefrag">
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount
-Dwordcount.case.sensitive=true /usr/joe/wordcount/input
/usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
</span>
</p>
<p>应该得到这样的输出:</p>
<p>
<span class="codefrag">
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
</span>
<br>
<span class="codefrag">Bye 1</span>
<br>
<span class="codefrag">Goodbye 1</span>
<br>
<span class="codefrag">Hadoop 1</span>
<br>
<span class="codefrag">Hello 2</span>
<br>
<span class="codefrag">World 2</span>
<br>
<span class="codefrag">hadoop 1</span>
<br>
</p>
<p>再运行一次,这一次关闭大小写敏感性(case-sensitivity):</p>
<p>
<span class="codefrag">
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount
-Dwordcount.case.sensitive=false /usr/joe/wordcount/input
/usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
</span>
</p>
<p>输出:</p>
<p>
<span class="codefrag">
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
</span>
<br>
<span class="codefrag">bye 1</span>
<br>
<span class="codefrag">goodbye 1</span>
<br>
<span class="codefrag">hadoop 2</span>
<br>
<span class="codefrag">hello 2</span>
<br>
<span class="codefrag">world 2</span>
<br>
</p>
<a name="N115F6"></a><a name="%E7%A8%8B%E5%BA%8F%E8%A6%81%E7%82%B9"></a>
<h3 class="h4">程序要点</h3>
<p>
通过使用一些Map/Reduce框架提供的功能,<span class="codefrag">WordCount</span>的第二个版本在原始版本基础上有了如下的改进:
</p>
<ul>
<li>
展示了应用程序如何在<span class="codefrag">Mapper</span> (和<span class="codefrag">Reducer</span>)中通过<span class="codefrag">configure</span>方法
修改配置参数(28-43行)。
</li>
<li>
展示了作业如何使用<span class="codefrag">DistributedCache</span> 来分发只读数据。
这里允许用户指定单词的模式,在计数时忽略那些符合模式的单词(104行)。
</li>
<li>
展示<span class="codefrag">Tool</span>接口和<span class="codefrag">GenericOptionsParser</span>处理Hadoop命令行选项的功能
(87-116, 119行)。
</li>
<li>
展示了应用程序如何使用<span class="codefrag">Counters</span>(68行),如何通过传递给<span class="codefrag">map</span>(和<span class="codefrag">reduce</span>
方法的<span class="codefrag">Reporter</span>实例来设置应用程序的状态信息(72行)。
</li>
</ul>
</div>
<p>
<em>Java和JNI是Sun Microsystems, Inc.在美国和其它国家的注册商标。</em>
</p>
</div>
<!--+
|end content
+-->
<div class="clearboth">&nbsp;</div>
</div>
<div id="footer">
<!--+
|start bottomstrip
+-->
<div class="lastmodified">
<script type="text/javascript"><!--
document.write("Last Published: " + document.lastModified);
// --></script>
</div>
<div class="copyright">
Copyright &copy;
2007 <a href="http://www.apache.org/licenses/">The Apache Software Foundation.</a>
</div>
<!--+
|end bottomstrip
+-->
</div>
</body>
</html>