blob: ebd728b52ab860d38b65c955d097dddf944ae974 [file] [log] [blame]
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<body>
Classes for performing various counting and aggregations.
<p />
<h2><a name="Aggregate"></a>Aggregate framework </h2>
<p />
Generally speaking, in order to implement an application using Map/Reduce
model, the developer needs to implement Map and Reduce functions (and possibly
Combine function). However, for a lot of applications related to counting and
statistics computing, these functions have very similar
characteristics. This provides a package implementing
those patterns. In particular, the package provides a generic mapper class,
a reducer class and a combiner class, and a set of built-in value aggregators.
It also provides a generic utility class, ValueAggregatorJob, that offers
a static function that creates map/reduce jobs:
<blockquote>
<pre>
public static Job createValueAggregatorJob(String args&#91;]) throws IOException;
</pre>
</blockquote>
To call this function, the user needs to pass in arguments specifying the input
directories, the output directory, number of reducers, the input data format
(textinputformat or sequencefileinputformat), and a file specifying user plugin
class(es) to load by the mapper.
A user plugin class is responsible for specifying what
aggregators to use and what values are for which aggregators.
A plugin class must implement the following interface:
<blockquote>
<pre>
public interface ValueAggregatorDescriptor {
public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object value);
public void configure(Configuration conf);
}
</pre>
</blockquote>
Function generateKeyValPairs will generate aggregation key/value pairs for the
input key/value pair. Each aggregation key encodes two pieces of information:
the aggregation type and aggregation ID.
The value is the value to be aggregated onto the aggregation ID according
to the aggregation type. Here is a simple example user plugin class for
counting the words in the input texts:
<blockquote>
<pre>
public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor {
public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object val) {
String words &#91;] &#61; val.toString().split(&#34; &#124;\t&#34;);
ArrayList&#60;Entry&#62; retv &#61; new ArrayList&#60;Entry&#62;();
for (int i &#61; 0; i &#60; words.length; i++) {
retv.add(generateEntry(LONG&#95;VALUE&#95;SUM, words&#91;i], ONE))
}
return retv;
}
public void configure(Configuration conf) {}
}
</pre>
</blockquote>
In the above code, LONG_VALUE_SUM is a string denoting the aggregation type
LongValueSum, which sums over long values. ONE denotes a string "1".
Function generateEntry(LONG_VALUE_SUM, words[i], ONE) will inperpret the first
argument as an aggregation type, the second as an aggregation ID, and the the
third argumnent as the value to be aggregated. The output will look like:
"LongValueSum:xxxx", where XXXX is the string value of words[i]. The value will
be "1". The mapper will call generateKeyValPairs(Object key, Object val) for
each input key/value pair to generate the desired aggregation id/value pairs.
The down stream combiner/reducer will interpret these pairs
as adding one to the aggregator XXXX.
<p />
Class ValueAggregatorBaseDescriptor is a base class that user plugin classes
can extend. Here is the XML fragment specifying the user plugin class:
<blockquote>
<pre>
&#60;property&#62;
&#60;name&#62;aggregator.descriptor.num&#60;/name&#62;
&#60;value&#62;1&#60;/value&#62;
&#60;/property&#62;
&#60;property&#62;
&#60;name&#62;aggregator.descriptor.0&#60;/name&#62;
&#60;value&#62;UserDefined,org.apache.hadoop.mapreduce.lib.aggregate.examples.WordCountAggregatorDescriptor&#60;/value&#62;
&#60;/property&#62;
</pre>
</blockquote>
Class ValueAggregatorBaseDescriptor itself provides a
default implementation for generateKeyValPairs:
<blockquote>
<pre>
public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object val) {
ArrayList&#60;Entry&#62; retv &#61; new ArrayList&#60;Entry&#62;();
String countType &#61; LONG&#95;VALUE&#95;SUM;
String id &#61; &#34;record&#95;count&#34;;
retv.add(generateEntry(countType, id, ONE));
return retv;
}
</pre>
</blockquote>
Thus, if no user plugin class is specified, the default behavior of the
map/reduce job is to count the number of records (lines) in the imput files.
<p />
During runtime, the mapper will invoke the generateKeyValPairs function for
each input key/value pair, and emit the generated key/value pairs:
<blockquote>
<pre>
public void map(WritableComparable key, Writable value,
Context context) throws IOException {
Iterator iter &#61; this.aggregatorDescriptorList.iterator();
while (iter.hasNext()) {
ValueAggregatorDescriptor ad &#61; (ValueAggregatorDescriptor) iter.next();
Iterator&#60;Entry&#62; ens &#61; ad.generateKeyValPairs(key, value).iterator();
while (ens.hasNext()) {
Entry en &#61; ens.next();
context.write((WritableComparable)en.getKey(), (Writable)en.getValue());
}
}
}
</pre>
</blockquote>
The reducer will create an aggregator object for each key/value list pair,
and perform the appropriate aggregation.
At the end, it will emit the aggregator's results:
<blockquote>
<pre>
public void reduce(WritableComparable key, Iterator values,
Context context) throws IOException {
String keyStr &#61; key.toString();
int pos &#61; keyStr.indexOf(ValueAggregatorDescriptor.TYPE&#95;SEPARATOR);
String type &#61; keyStr.substring(0,pos);
keyStr &#61; keyStr.substring(pos+ValueAggregatorDescriptor.TYPE&#95;SEPARATOR.length());
ValueAggregator aggregator &#61;
ValueAggregatorBaseDescriptor.generateValueAggregator(type);
for (Text value : values) {
aggregator.addNextValue(value);
}
String val &#61; aggregator.getReport();
key &#61; new Text(keyStr);
context.write(key, new Text(val));
}
</pre>
</blockquote>
In order to be able to use combiner, all the aggregation type be
aggregators must be associative and communitive.
The following are the types supported: <ul>
<li> LongValueSum: sum over long values
</li> <li> DoubleValueSum: sum over float/double values
</li> <li> uniqValueCount: count the number of distinct values
</li> <li> ValueHistogram: compute the histogram of values compute the minimum,
maximum, media,average, standard deviation of numeric values
</li></ul>
<p />
<h2><a name="Create_and_run"></a> Create and run an application </h2>
<p />
To create an application, the user needs to do the following things:
<p />
1. Implement a user plugin:
<blockquote>
<pre>
import org.apache.hadoop.mapreduce.lib.aggregate.ValueAggregatorBaseDescriptor;
public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor {
public void map(WritableComparable key, Writable value, Context cpntext) throws IOException {
}
public void configure(Configuration conf) {}
}
</pre>
</blockquote>
2. Create an xml file specifying the user plugin.
<p />
3. Compile your java class and create a jar file, say wc.jar.
<p />
Finally, run the job:
<blockquote>
<pre>
hadoop jar wc.jar org.apache.hadoop.mapreduce.lib.aggregate.ValueAggregatorJob indirs outdir numofreducers textinputformat|sequencefileinputformat spec_file
</pre>
</blockquote>
<p />
</body>
</html>