| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.lib.join; |
| |
| import java.io.IOException; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.io.WritableComparator; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.util.ReflectionUtils; |
| |
| /** |
| * Proxy class for a RecordReader participating in the join framework. |
| * |
| * This class keeps track of the "head" key-value pair for the |
| * provided RecordReader and keeps a store of values matching a key when |
| * this source is participating in a join. |
| */ |
| @InterfaceAudience.Public |
| @InterfaceStability.Stable |
| public class WrappedRecordReader<K extends WritableComparable<?>, |
| U extends Writable> extends ComposableRecordReader<K,U> { |
| |
| protected boolean empty = false; |
| private RecordReader<K,U> rr; |
| private int id; // index at which values will be inserted in collector |
| |
| protected WritableComparator cmp = null; |
| private K key; // key at the top of this RR |
| private U value; // value assoc with key |
| private ResetableIterator<U> vjoin; |
| private Configuration conf = new Configuration(); |
| @SuppressWarnings("unchecked") |
| private Class<? extends WritableComparable> keyclass = null; |
| private Class<? extends Writable> valueclass = null; |
| |
| protected WrappedRecordReader(int id) { |
| this.id = id; |
| vjoin = new StreamBackedIterator<U>(); |
| } |
| |
| /** |
| * For a given RecordReader rr, occupy position id in collector. |
| */ |
| WrappedRecordReader(int id, RecordReader<K,U> rr, |
| Class<? extends WritableComparator> cmpcl) |
| throws IOException, InterruptedException { |
| this.id = id; |
| this.rr = rr; |
| if (cmpcl != null) { |
| try { |
| this.cmp = cmpcl.newInstance(); |
| } catch (InstantiationException e) { |
| throw new IOException(e); |
| } catch (IllegalAccessException e) { |
| throw new IOException(e); |
| } |
| } |
| vjoin = new StreamBackedIterator<U>(); |
| } |
| |
| public void initialize(InputSplit split, |
| TaskAttemptContext context) |
| throws IOException, InterruptedException { |
| rr.initialize(split, context); |
| conf = context.getConfiguration(); |
| nextKeyValue(); |
| if (!empty) { |
| keyclass = key.getClass().asSubclass(WritableComparable.class); |
| valueclass = value.getClass(); |
| if (cmp == null) { |
| cmp = WritableComparator.get(keyclass); |
| } |
| } |
| } |
| |
| /** |
| * Request new key from proxied RR. |
| */ |
| @SuppressWarnings("unchecked") |
| public K createKey() { |
| if (keyclass != null) { |
| return (K) ReflectionUtils.newInstance(keyclass, conf); |
| } |
| return (K) NullWritable.get(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public U createValue() { |
| if (valueclass != null) { |
| return (U) ReflectionUtils.newInstance(valueclass, conf); |
| } |
| return (U) NullWritable.get(); |
| } |
| |
| /** {@inheritDoc} */ |
| public int id() { |
| return id; |
| } |
| |
| /** |
| * Return the key at the head of this RR. |
| */ |
| public K key() { |
| return key; |
| } |
| |
| /** |
| * Clone the key at the head of this RR into the object supplied. |
| */ |
| public void key(K qkey) throws IOException { |
| ReflectionUtils.copy(conf, key, qkey); |
| } |
| |
| /** |
| * Return true if the RR- including the k,v pair stored in this object- |
| * is exhausted. |
| */ |
| public boolean hasNext() { |
| return !empty; |
| } |
| |
| /** |
| * Skip key-value pairs with keys less than or equal to the key provided. |
| */ |
| public void skip(K key) throws IOException, InterruptedException { |
| if (hasNext()) { |
| while (cmp.compare(key(), key) <= 0 && next()); |
| } |
| } |
| |
| /** |
| * Add an iterator to the collector at the position occupied by this |
| * RecordReader over the values in this stream paired with the key |
| * provided (ie register a stream of values from this source matching K |
| * with a collector). |
| */ |
| @SuppressWarnings("unchecked") |
| public void accept(CompositeRecordReader.JoinCollector i, K key) |
| throws IOException, InterruptedException { |
| vjoin.clear(); |
| if (key() != null && 0 == cmp.compare(key, key())) { |
| do { |
| vjoin.add(value); |
| } while (next() && 0 == cmp.compare(key, key())); |
| } |
| i.add(id, vjoin); |
| } |
| |
| /** |
| * Read the next k,v pair into the head of this object; return true iff |
| * the RR and this are exhausted. |
| */ |
| public boolean nextKeyValue() throws IOException, InterruptedException { |
| if (hasNext()) { |
| next(); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Read the next k,v pair into the head of this object; return true iff |
| * the RR and this are exhausted. |
| */ |
| private boolean next() throws IOException, InterruptedException { |
| empty = !rr.nextKeyValue(); |
| key = rr.getCurrentKey(); |
| value = rr.getCurrentValue(); |
| return !empty; |
| } |
| |
| /** |
| * Get current key |
| */ |
| public K getCurrentKey() throws IOException, InterruptedException { |
| return rr.getCurrentKey(); |
| } |
| |
| /** |
| * Get current value |
| */ |
| public U getCurrentValue() throws IOException, InterruptedException { |
| return rr.getCurrentValue(); |
| } |
| |
| /** |
| * Request progress from proxied RR. |
| */ |
| public float getProgress() throws IOException, InterruptedException { |
| return rr.getProgress(); |
| } |
| |
| /** |
| * Forward close request to proxied RR. |
| */ |
| public void close() throws IOException { |
| rr.close(); |
| } |
| |
| /** |
| * Implement Comparable contract (compare key at head of proxied RR |
| * with that of another). |
| */ |
| public int compareTo(ComposableRecordReader<K,?> other) { |
| return cmp.compare(key(), other.key()); |
| } |
| |
| /** |
| * Return true iff compareTo(other) retn true. |
| */ |
| @SuppressWarnings("unchecked") // Explicit type check prior to cast |
| public boolean equals(Object other) { |
| return other instanceof ComposableRecordReader |
| && 0 == compareTo((ComposableRecordReader)other); |
| } |
| |
| public int hashCode() { |
| assert false : "hashCode not designed"; |
| return 42; |
| } |
| } |