| <html> |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <body> |
| |
| Classes for performing various counting and aggregations. |
| <p /> |
| <h2><a name="Aggregate"></a>Aggregate framework </h2> |
| <p /> |
| Generally speaking, in order to implement an application using Map/Reduce |
| model, the developer needs to implement Map and Reduce functions (and possibly |
| Combine function). However, for a lot of applications related to counting and |
| statistics computing, these functions have very similar |
| characteristics. This provides a package implementing |
| those patterns. In particular, the package provides a generic mapper class, |
| a reducer class and a combiner class, and a set of built-in value aggregators. |
| It also provides a generic utility class, ValueAggregatorJob, that offers |
| a static function that creates map/reduce jobs: |
| <blockquote> |
| <pre> |
| public static Job createValueAggregatorJob(String args[]) throws IOException; |
| </pre> |
| </blockquote> |
| To call this function, the user needs to pass in arguments specifying the input |
| directories, the output directory, number of reducers, the input data format |
| (textinputformat or sequencefileinputformat), and a file specifying user plugin |
| class(es) to load by the mapper. |
| A user plugin class is responsible for specifying what |
| aggregators to use and what values are for which aggregators. |
| A plugin class must implement the following interface: |
| <blockquote> |
| <pre> |
| public interface ValueAggregatorDescriptor { |
| public ArrayList<Entry> generateKeyValPairs(Object key, Object value); |
| public void configure(Configuration conf); |
| } |
| </pre> |
| </blockquote> |
| Function generateKeyValPairs will generate aggregation key/value pairs for the |
| input key/value pair. Each aggregation key encodes two pieces of information: |
| the aggregation type and aggregation ID. |
| The value is the value to be aggregated onto the aggregation ID according |
| to the aggregation type. Here is a simple example user plugin class for |
| counting the words in the input texts: |
| <blockquote> |
| <pre> |
| public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor { |
| public ArrayList<Entry> generateKeyValPairs(Object key, Object val) { |
| String words [] = val.toString().split(" |\t"); |
| ArrayList<Entry> retv = new ArrayList<Entry>(); |
| for (int i = 0; i < words.length; i++) { |
| retv.add(generateEntry(LONG_VALUE_SUM, words[i], ONE)) |
| } |
| return retv; |
| } |
| public void configure(Configuration conf) {} |
| } |
| </pre> |
| </blockquote> |
| In the above code, LONG_VALUE_SUM is a string denoting the aggregation type |
| LongValueSum, which sums over long values. ONE denotes a string "1". |
| Function generateEntry(LONG_VALUE_SUM, words[i], ONE) will inperpret the first |
| argument as an aggregation type, the second as an aggregation ID, and the the |
| third argumnent as the value to be aggregated. The output will look like: |
| "LongValueSum:xxxx", where XXXX is the string value of words[i]. The value will |
| be "1". The mapper will call generateKeyValPairs(Object key, Object val) for |
| each input key/value pair to generate the desired aggregation id/value pairs. |
| The down stream combiner/reducer will interpret these pairs |
| as adding one to the aggregator XXXX. |
| <p /> |
| Class ValueAggregatorBaseDescriptor is a base class that user plugin classes |
| can extend. Here is the XML fragment specifying the user plugin class: |
| <blockquote> |
| <pre> |
| <property> |
| <name>aggregator.descriptor.num</name> |
| <value>1</value> |
| </property> |
| <property> |
| <name>aggregator.descriptor.0</name> |
| <value>UserDefined,org.apache.hadoop.mapreduce.lib.aggregate.examples.WordCountAggregatorDescriptor</value> |
| </property> |
| </pre> |
| </blockquote> |
| Class ValueAggregatorBaseDescriptor itself provides a |
| default implementation for generateKeyValPairs: |
| <blockquote> |
| <pre> |
| public ArrayList<Entry> generateKeyValPairs(Object key, Object val) { |
| ArrayList<Entry> retv = new ArrayList<Entry>(); |
| String countType = LONG_VALUE_SUM; |
| String id = "record_count"; |
| retv.add(generateEntry(countType, id, ONE)); |
| return retv; |
| } |
| </pre> |
| </blockquote> |
| Thus, if no user plugin class is specified, the default behavior of the |
| map/reduce job is to count the number of records (lines) in the imput files. |
| <p /> |
| During runtime, the mapper will invoke the generateKeyValPairs function for |
| each input key/value pair, and emit the generated key/value pairs: |
| <blockquote> |
| <pre> |
| public void map(WritableComparable key, Writable value, |
| Context context) throws IOException { |
| Iterator iter = this.aggregatorDescriptorList.iterator(); |
| while (iter.hasNext()) { |
| ValueAggregatorDescriptor ad = (ValueAggregatorDescriptor) iter.next(); |
| Iterator<Entry> ens = ad.generateKeyValPairs(key, value).iterator(); |
| while (ens.hasNext()) { |
| Entry en = ens.next(); |
| context.write((WritableComparable)en.getKey(), (Writable)en.getValue()); |
| } |
| } |
| } |
| </pre> |
| </blockquote> |
| The reducer will create an aggregator object for each key/value list pair, |
| and perform the appropriate aggregation. |
| At the end, it will emit the aggregator's results: |
| <blockquote> |
| <pre> |
| public void reduce(WritableComparable key, Iterator values, |
| Context context) throws IOException { |
| String keyStr = key.toString(); |
| int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR); |
| String type = keyStr.substring(0,pos); |
| keyStr = keyStr.substring(pos+ValueAggregatorDescriptor.TYPE_SEPARATOR.length()); |
| ValueAggregator aggregator = |
| ValueAggregatorBaseDescriptor.generateValueAggregator(type); |
| for (Text value : values) { |
| aggregator.addNextValue(value); |
| } |
| String val = aggregator.getReport(); |
| key = new Text(keyStr); |
| context.write(key, new Text(val)); |
| } |
| </pre> |
| </blockquote> |
| In order to be able to use combiner, all the aggregation type be |
| aggregators must be associative and communitive. |
| The following are the types supported: <ul> |
| <li> LongValueSum: sum over long values |
| </li> <li> DoubleValueSum: sum over float/double values |
| </li> <li> uniqValueCount: count the number of distinct values |
| </li> <li> ValueHistogram: compute the histogram of values compute the minimum, |
| maximum, media,average, standard deviation of numeric values |
| </li></ul> |
| <p /> |
| <h2><a name="Create_and_run"></a> Create and run an application </h2> |
| <p /> |
| To create an application, the user needs to do the following things: |
| <p /> |
| 1. Implement a user plugin: |
| <blockquote> |
| <pre> |
| import org.apache.hadoop.mapreduce.lib.aggregate.ValueAggregatorBaseDescriptor; |
| |
| public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor { |
| public void map(WritableComparable key, Writable value, Context cpntext) throws IOException { |
| } |
| public void configure(Configuration conf) {} |
| } |
| </pre> |
| </blockquote> |
| |
| 2. Create an xml file specifying the user plugin. |
| <p /> |
| 3. Compile your java class and create a jar file, say wc.jar. |
| |
| <p /> |
| Finally, run the job: |
| <blockquote> |
| <pre> |
| hadoop jar wc.jar org.apache.hadoop.mapreduce.lib.aggregate.ValueAggregatorJob indirs outdir numofreducers textinputformat|sequencefileinputformat spec_file |
| </pre> |
| </blockquote> |
| <p /> |
| |
| |
| </body> |
| </html> |