blob: 16e964559d781c3a6434d4f63d76990294f53331 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.broadcast.input;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.runtime.library.api.KVReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
public class BroadcastKVReader<K, V> implements KVReader {
private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
private final BroadcastShuffleManager shuffleManager;
private final Configuration conf;
private final CompressionCodec codec;
private final Class<K> keyClass;
private final Class<V> valClass;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valDeserializer;
private final DataInputBuffer keyIn;
private final DataInputBuffer valIn;
private final SimpleValueIterator valueIterator;
private final SimpleIterable valueIterable;
private K key;
private V value;
private FetchedInput currentFetchedInput;
private IFile.Reader currentReader;
public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
Configuration conf) {
this.shuffleManager = shuffleManager;
this.conf = conf;
if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
Class<? extends CompressionCodec> codecClass = ConfigUtils
.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, conf);
} else {
codec = null;
}
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.keyIn = new DataInputBuffer();
this.valIn = new DataInputBuffer();
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valueIterator = new SimpleValueIterator();
this.valueIterable = new SimpleIterable(this.valueIterator);
}
// TODO NEWTEZ Maybe add an interface to check whether next will block.
/**
* Moves to the next key/values(s) pair
*
* @return true if another key/value(s) pair exists, false if there are no
* more.
* @throws IOException
* if an error occurs
*/
@Override
public boolean next() throws IOException {
if (readNextFromCurrentReader()) {
return true;
} else {
boolean nextInputExists = moveToNextInput();
while (nextInputExists) {
if(readNextFromCurrentReader()) {
return true;
}
nextInputExists = moveToNextInput();
}
return false;
}
}
@SuppressWarnings("unchecked")
@Override
public KVRecord getCurrentKV() throws IOException {
this.valueIterator.setValue(value);
return new KVRecord((Object)key, (Iterable<Object>)this.valueIterable);
}
/**
* Tries reading the next key and value from the current reader.
* @return true if the current reader has more records
* @throws IOException
*/
private boolean readNextFromCurrentReader() throws IOException {
// Initial reader.
if (this.currentReader == null) {
return false;
} else {
boolean hasMore = this.currentReader.nextRawKey(keyIn);
if (hasMore) {
this.currentReader.nextRawValue(valIn);
this.key = keyDeserializer.deserialize(this.key);
this.value = valDeserializer.deserialize(this.value);
return true;
}
return false;
}
}
/**
* Moves to the next available input. This method may block if the input is not ready yet.
* Also takes care of closing the previous input.
*
* @return true if the next input exists, false otherwise
* @throws IOException
* @throws InterruptedException
*/
private boolean moveToNextInput() throws IOException {
if (currentReader != null) { // Close the current reader.
currentReader.close();
currentFetchedInput.free();
}
try {
currentFetchedInput = shuffleManager.getNextInput();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for next available input", e);
throw new IOException(e);
}
if (currentFetchedInput == null) {
return false; // No more inputs
} else {
currentReader = openIFileReader(currentFetchedInput);
return true;
}
}
public IFile.Reader openIFileReader(FetchedInput fetchedInput)
throws IOException {
if (fetchedInput.getType() == Type.MEMORY) {
MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
mfi.getBytes(), 0, (int) mfi.getSize());
} else {
return new IFile.Reader(conf, fetchedInput.getInputStream(),
fetchedInput.getSize(), codec, null);
}
}
// TODO NEWTEZ Move this into a common class. Also used in MRInput
private class SimpleValueIterator implements Iterator<V> {
private V value;
public void setValue(V value) {
this.value = value;
}
public boolean hasNext() {
return value != null;
}
public V next() {
V value = this.value;
this.value = null;
return value;
}
public void remove() {
throw new UnsupportedOperationException();
}
}
private class SimpleIterable implements Iterable<V> {
private final Iterator<V> iterator;
public SimpleIterable(Iterator<V> iterator) {
this.iterator = iterator;
}
@Override
public Iterator<V> iterator() {
return iterator;
}
}
}