| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.contrib.utils.join; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.WritableUtils; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.OutputCollector; |
| import org.apache.hadoop.mapred.Reporter; |
| |
| /** |
| * This abstract class serves as the base class for the reducer class of a data |
| * join job. The reduce function will first group the values according to their |
| * input tags, and then compute the cross product of over the groups. For each |
| * tuple in the cross product, it calls the following method, which is expected |
| * to be implemented in a subclass. |
| * |
| * protected abstract TaggedMapOutput combine(Object[] tags, Object[] values); |
| * |
| * The above method is expected to produce one output value from an array of |
| * records of different sources. The user code can also perform filtering here. |
| * It can return null if it decides to the records do not meet certain |
| * conditions. |
| * |
| */ |
| public abstract class DataJoinReducerBase extends JobBase { |
| |
| protected Reporter reporter = null; |
| |
| private long maxNumOfValuesPerGroup = 100; |
| |
| protected long largestNumOfValues = 0; |
| |
| protected long numOfValues = 0; |
| |
| protected long collected = 0; |
| |
| protected JobConf job; |
| |
| public void close() throws IOException { |
| if (this.reporter != null) { |
| this.reporter.setStatus(super.getReport()); |
| } |
| } |
| |
| public void configure(JobConf job) { |
| super.configure(job); |
| this.job = job; |
| this.maxNumOfValuesPerGroup = job.getLong("datajoin.maxNumOfValuesPerGroup", 100); |
| } |
| |
| /** |
| * The subclass can provide a different implementation on ResetableIterator. |
| * This is necessary if the number of values in a reduce call is very high. |
| * |
| * The default provided here uses ArrayListBackedIterator |
| * |
| * @return an Object of ResetableIterator. |
| */ |
| protected ResetableIterator createResetableIterator() { |
| return new ArrayListBackedIterator(); |
| } |
| |
| /** |
| * This is the function that re-groups values for a key into sub-groups based |
| * on a secondary key (input tag). |
| * |
| * @param arg1 |
| * @return |
| */ |
| private SortedMap<Object, ResetableIterator> regroup(Object key, |
| Iterator arg1, Reporter reporter) throws IOException { |
| this.numOfValues = 0; |
| SortedMap<Object, ResetableIterator> retv = new TreeMap<Object, ResetableIterator>(); |
| TaggedMapOutput aRecord = null; |
| while (arg1.hasNext()) { |
| this.numOfValues += 1; |
| if (this.numOfValues % 100 == 0) { |
| reporter.setStatus("key: " + key.toString() + " numOfValues: " |
| + this.numOfValues); |
| } |
| if (this.numOfValues > this.maxNumOfValuesPerGroup) { |
| continue; |
| } |
| aRecord = ((TaggedMapOutput) arg1.next()).clone(job); |
| Text tag = aRecord.getTag(); |
| ResetableIterator data = retv.get(tag); |
| if (data == null) { |
| data = createResetableIterator(); |
| retv.put(tag, data); |
| } |
| data.add(aRecord); |
| } |
| if (this.numOfValues > this.largestNumOfValues) { |
| this.largestNumOfValues = numOfValues; |
| LOG.info("key: " + key.toString() + " this.largestNumOfValues: " |
| + this.largestNumOfValues); |
| } |
| return retv; |
| } |
| |
| public void reduce(Object key, Iterator values, |
| OutputCollector output, Reporter reporter) throws IOException { |
| if (this.reporter == null) { |
| this.reporter = reporter; |
| } |
| |
| SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter); |
| Object[] tags = groups.keySet().toArray(); |
| ResetableIterator[] groupValues = new ResetableIterator[tags.length]; |
| for (int i = 0; i < tags.length; i++) { |
| groupValues[i] = groups.get(tags[i]); |
| } |
| joinAndCollect(tags, groupValues, key, output, reporter); |
| addLongValue("groupCount", 1); |
| for (int i = 0; i < tags.length; i++) { |
| groupValues[i].close(); |
| } |
| } |
| |
| /** |
| * The subclass can overwrite this method to perform additional filtering |
| * and/or other processing logic before a value is collected. |
| * |
| * @param key |
| * @param aRecord |
| * @param output |
| * @param reporter |
| * @throws IOException |
| */ |
| protected void collect(Object key, TaggedMapOutput aRecord, |
| OutputCollector output, Reporter reporter) throws IOException { |
| this.collected += 1; |
| addLongValue("collectedCount", 1); |
| if (aRecord != null) { |
| output.collect(key, aRecord.getData()); |
| reporter.setStatus("key: " + key.toString() + " collected: " + collected); |
| addLongValue("actuallyCollectedCount", 1); |
| } |
| } |
| |
| /** |
| * join the list of the value lists, and collect the results. |
| * |
| * @param tags |
| * a list of input tags |
| * @param values |
| * a list of value lists, each corresponding to one input source |
| * @param key |
| * @param output |
| * @throws IOException |
| */ |
| private void joinAndCollect(Object[] tags, ResetableIterator[] values, |
| Object key, OutputCollector output, Reporter reporter) |
| throws IOException { |
| if (values.length < 1) { |
| return; |
| } |
| Object[] partialList = new Object[values.length]; |
| joinAndCollect(tags, values, 0, partialList, key, output, reporter); |
| } |
| |
| /** |
| * Perform the actual join recursively. |
| * |
| * @param tags |
| * a list of input tags |
| * @param values |
| * a list of value lists, each corresponding to one input source |
| * @param pos |
| * indicating the next value list to be joined |
| * @param partialList |
| * a list of values, each from one value list considered so far. |
| * @param key |
| * @param output |
| * @throws IOException |
| */ |
| private void joinAndCollect(Object[] tags, ResetableIterator[] values, |
| int pos, Object[] partialList, Object key, |
| OutputCollector output, Reporter reporter) throws IOException { |
| |
| if (values.length == pos) { |
| // get a value from each source. Combine them |
| TaggedMapOutput combined = combine(tags, partialList); |
| collect(key, combined, output, reporter); |
| return; |
| } |
| ResetableIterator nextValues = values[pos]; |
| nextValues.reset(); |
| while (nextValues.hasNext()) { |
| Object v = nextValues.next(); |
| partialList[pos] = v; |
| joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter); |
| } |
| } |
| |
| public static Text SOURCE_TAGS_FIELD = new Text("SOURCE_TAGS"); |
| |
| public static Text NUM_OF_VALUES_FIELD = new Text("NUM_OF_VALUES"); |
| |
| /** |
| * |
| * @param tags |
| * a list of source tags |
| * @param values |
| * a value per source |
| * @return combined value derived from values of the sources |
| */ |
| protected abstract TaggedMapOutput combine(Object[] tags, Object[] values); |
| |
| public void map(Object arg0, Object arg1, OutputCollector arg2, |
| Reporter arg3) throws IOException { |
| // TODO Auto-generated method stub |
| |
| } |
| } |