| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tez.mapreduce.input; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileSystem.Statistics; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.serializer.Deserializer; |
| import org.apache.hadoop.io.serializer.SerializationFactory; |
| import org.apache.hadoop.mapred.FileSplit; |
| import org.apache.hadoop.mapred.InputFormat; |
| import org.apache.hadoop.mapred.InputSplit; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.JobContext; |
| import org.apache.hadoop.mapred.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; |
| import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez; |
| import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; |
| import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.tez.common.TezUtils; |
| import org.apache.tez.common.counters.TaskCounter; |
| import org.apache.tez.common.counters.TezCounter; |
| import org.apache.tez.mapreduce.common.Utils; |
| import org.apache.tez.mapreduce.hadoop.mapred.MRReporter; |
| import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.api.Input; |
| import org.apache.tez.runtime.api.LogicalInput; |
| import org.apache.tez.runtime.api.TezInputContext; |
| import org.apache.tez.runtime.library.api.KVReader; |
| |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * {@link MRInput} is an {@link Input} which provides key/values pairs |
| * for the consumer. |
| * |
| * It is compatible with all standard Apache Hadoop MapReduce |
| * {@link InputFormat} implementations. |
| */ |
| |
| public class MRInput implements LogicalInput { |
| |
| private static final Log LOG = LogFactory.getLog(MRInput.class); |
| |
| |
| private TezInputContext inputContext; |
| |
| private JobConf jobConf; |
| private Configuration incrementalConf; |
| private boolean recordReaderCreated = false; |
| |
| boolean useNewApi; |
| |
| org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext; |
| |
| @SuppressWarnings("rawtypes") |
| private org.apache.hadoop.mapreduce.InputFormat newInputFormat; |
| @SuppressWarnings("rawtypes") |
| private org.apache.hadoop.mapreduce.RecordReader newRecordReader; |
| protected org.apache.hadoop.mapreduce.InputSplit newInputSplit; |
| |
| @SuppressWarnings("rawtypes") |
| private InputFormat oldInputFormat; |
| @SuppressWarnings("rawtypes") |
| protected RecordReader oldRecordReader; |
| |
| protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex(); |
| |
| private TezCounter inputRecordCounter; |
| private TezCounter fileInputByteCounter; |
| private List<Statistics> fsStats; |
| |
| @Override |
| public List<Event> initialize(TezInputContext inputContext) throws IOException { |
| this.inputContext = inputContext; |
| Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload()); |
| this.jobConf = new JobConf(conf); |
| |
| // Read split information. |
| TaskSplitMetaInfo[] allMetaInfo = readSplits(conf); |
| TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()]; |
| this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(), |
| thisTaskMetaInfo.getStartOffset()); |
| |
| // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in |
| // theory, can be used by the MapProcessor, ReduceProcessor or a custom |
| // processor. (The processor could provide the counter though) |
| this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS); |
| this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ); |
| |
| useNewApi = this.jobConf.getUseNewMapper(); |
| |
| if (useNewApi) { |
| TaskAttemptContext taskAttemptContext = createTaskAttemptContext(); |
| Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz; |
| try { |
| inputFormatClazz = taskAttemptContext.getInputFormatClass(); |
| } catch (ClassNotFoundException e) { |
| throw new IOException("Unable to instantiate InputFormat class", e); |
| } |
| |
| newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf); |
| |
| newInputSplit = getNewSplitDetails(splitMetaInfo); |
| |
| List<Statistics> matchedStats = null; |
| if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { |
| matchedStats = Utils.getFsStatistics( |
| ((org.apache.hadoop.mapreduce.lib.input.FileSplit) |
| newInputSplit).getPath(), this.jobConf); |
| } |
| fsStats = matchedStats; |
| |
| try { |
| newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext); |
| newRecordReader.initialize(newInputSplit, taskAttemptContext); |
| } catch (InterruptedException e) { |
| throw new IOException("Interrupted while creating record reader", e); |
| } |
| } else { // OLD API |
| oldInputFormat = this.jobConf.getInputFormat(); |
| InputSplit oldInputSplit = |
| getOldSplitDetails(splitMetaInfo); |
| |
| |
| List<Statistics> matchedStats = null; |
| if (oldInputSplit instanceof FileSplit) { |
| matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf); |
| } |
| fsStats = matchedStats; |
| |
| long bytesInPrev = getInputBytes(); |
| oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit, |
| this.jobConf, new MRReporter(inputContext, oldInputSplit)); |
| long bytesInCurr = getInputBytes(); |
| fileInputByteCounter.increment(bytesInCurr - bytesInPrev); |
| setIncrementalConfigParams(oldInputSplit); |
| } |
| return null; |
| } |
| |
| @Override |
| public KVReader getReader() throws IOException { |
| Preconditions |
| .checkState(recordReaderCreated == false, |
| "Only a single instance of record reader can be created for this input."); |
| recordReaderCreated = true; |
| return new MRInputKVReader(); |
| } |
| |
| |
| @Override |
| public void handleEvents(List<Event> inputEvents) { |
| // Not expecting any events at the moment. |
| } |
| |
| |
| @Override |
| public void setNumPhysicalInputs(int numInputs) { |
| // Not required at the moment. May be required if splits are sent via events. |
| } |
| |
| @Override |
| public List<Event> close() throws IOException { |
| long bytesInPrev = getInputBytes(); |
| if (useNewApi) { |
| newRecordReader.close(); |
| } else { |
| oldRecordReader.close(); |
| } |
| long bytesInCurr = getInputBytes(); |
| fileInputByteCounter.increment(bytesInCurr - bytesInPrev); |
| |
| return null; |
| } |
| |
| /** |
| * {@link MRInput} sets some additional parameters like split location when using |
| * the new API. This methods returns the list of additional updates, and |
| * should be used by Processors using the old MapReduce API with {@link MRInput}. |
| * |
| * @return the additional fields set by {@link MRInput} |
| */ |
| public Configuration getConfigUpdates() { |
| return new Configuration(incrementalConf); |
| } |
| |
| public float getProgress() throws IOException, InterruptedException { |
| if (useNewApi) { |
| return newRecordReader.getProgress(); |
| } else { |
| return oldRecordReader.getProgress(); |
| } |
| } |
| |
| |
| private TaskAttemptContext createTaskAttemptContext() { |
| return new TaskAttemptContextImpl(this.jobConf, inputContext, true); |
| } |
| |
| |
| private static class SimpleValueIterator implements Iterator<Object> { |
| |
| private Object value; |
| |
| public void setValue(Object value) { |
| this.value = value; |
| } |
| |
| public boolean hasNext() { |
| return value != null; |
| } |
| |
| public Object next() { |
| Object value = this.value; |
| this.value = null; |
| return value; |
| } |
| |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| private static class SimpleIterable implements Iterable<Object> { |
| private final Iterator<Object> iterator; |
| public SimpleIterable(Iterator<Object> iterator) { |
| this.iterator = iterator; |
| } |
| |
| @Override |
| public Iterator<Object> iterator() { |
| return iterator; |
| } |
| } |
| |
| |
| |
| |
| @SuppressWarnings("unchecked") |
| private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo) |
| throws IOException { |
| Path file = new Path(splitMetaInfo.getSplitLocation()); |
| FileSystem fs = FileSystem.getLocal(jobConf); |
| file = fs.makeQualified(file); |
| LOG.info("Reading input split file from : " + file); |
| long offset = splitMetaInfo.getStartOffset(); |
| |
| FSDataInputStream inFile = fs.open(file); |
| inFile.seek(offset); |
| String className = Text.readString(inFile); |
| Class<org.apache.hadoop.mapred.InputSplit> cls; |
| try { |
| cls = |
| (Class<org.apache.hadoop.mapred.InputSplit>) |
| jobConf.getClassByName(className); |
| } catch (ClassNotFoundException ce) { |
| IOException wrap = new IOException("Split class " + className + |
| " not found"); |
| wrap.initCause(ce); |
| throw wrap; |
| } |
| SerializationFactory factory = new SerializationFactory(jobConf); |
| Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = |
| (Deserializer<org.apache.hadoop.mapred.InputSplit>) |
| factory.getDeserializer(cls); |
| deserializer.open(inFile); |
| org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null); |
| long pos = inFile.getPos(); |
| inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES) |
| .increment(pos - offset); |
| inFile.close(); |
| return split; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails( |
| TaskSplitIndex splitMetaInfo) throws IOException { |
| Path file = new Path(splitMetaInfo.getSplitLocation()); |
| long offset = splitMetaInfo.getStartOffset(); |
| |
| // Split information read from local filesystem. |
| FileSystem fs = FileSystem.getLocal(jobConf); |
| file = fs.makeQualified(file); |
| LOG.info("Reading input split file from : " + file); |
| FSDataInputStream inFile = fs.open(file); |
| inFile.seek(offset); |
| String className = Text.readString(inFile); |
| Class<org.apache.hadoop.mapreduce.InputSplit> cls; |
| try { |
| cls = |
| (Class<org.apache.hadoop.mapreduce.InputSplit>) |
| jobConf.getClassByName(className); |
| } catch (ClassNotFoundException ce) { |
| IOException wrap = new IOException("Split class " + className + |
| " not found"); |
| wrap.initCause(ce); |
| throw wrap; |
| } |
| SerializationFactory factory = new SerializationFactory(jobConf); |
| Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = |
| (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) |
| factory.getDeserializer(cls); |
| deserializer.open(inFile); |
| org.apache.hadoop.mapreduce.InputSplit split = |
| deserializer.deserialize(null); |
| long pos = inFile.getPos(); |
| inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES) |
| .increment(pos - offset); |
| inFile.close(); |
| return split; |
| } |
| |
| private void setIncrementalConfigParams(InputSplit inputSplit) { |
| if (inputSplit instanceof FileSplit) { |
| FileSplit fileSplit = (FileSplit) inputSplit; |
| this.incrementalConf = new Configuration(false); |
| |
| this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath() |
| .toString()); |
| this.incrementalConf.setLong(JobContext.MAP_INPUT_START, |
| fileSplit.getStart()); |
| this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH, |
| fileSplit.getLength()); |
| } |
| LOG.info("Processing split: " + inputSplit); |
| } |
| |
| private long getInputBytes() { |
| if (fsStats == null) return 0; |
| long bytesRead = 0; |
| for (Statistics stat: fsStats) { |
| bytesRead = bytesRead + stat.getBytesRead(); |
| } |
| return bytesRead; |
| } |
| |
| protected TaskSplitMetaInfo[] readSplits(Configuration conf) |
| throws IOException { |
| TaskSplitMetaInfo[] allTaskSplitMetaInfo; |
| allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf, |
| FileSystem.getLocal(conf)); |
| return allTaskSplitMetaInfo; |
| } |
| |
| private class MRInputKVReader implements KVReader { |
| |
| Object key; |
| Object value; |
| |
| private SimpleValueIterator valueIterator = new SimpleValueIterator(); |
| private SimpleIterable valueIterable = new SimpleIterable(valueIterator); |
| |
| private final boolean localNewApi; |
| |
| MRInputKVReader() { |
| localNewApi = useNewApi; |
| if (!localNewApi) { |
| key = oldRecordReader.createKey(); |
| value =oldRecordReader.createValue(); |
| } |
| } |
| |
| // Setup the values iterator once, and set value on the same object each time |
| // to prevent lots of objects being created. |
| |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public boolean next() throws IOException { |
| boolean hasNext = false; |
| long bytesInPrev = getInputBytes(); |
| if (localNewApi) { |
| try { |
| hasNext = newRecordReader.nextKeyValue(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("Interrupted while checking for next key-value", e); |
| } |
| } else { |
| hasNext = oldRecordReader.next(key, value); |
| } |
| long bytesInCurr = getInputBytes(); |
| fileInputByteCounter.increment(bytesInCurr - bytesInPrev); |
| |
| if (hasNext) { |
| inputRecordCounter.increment(1); |
| } |
| |
| return hasNext; |
| } |
| |
| @Override |
| public KVRecord getCurrentKV() throws IOException { |
| KVRecord kvRecord = null; |
| if (localNewApi) { |
| try { |
| valueIterator.setValue(newRecordReader.getCurrentValue()); |
| kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("Interrupted while fetching next key-value", e); |
| } |
| |
| } else { |
| valueIterator.setValue(value); |
| kvRecord = new KVRecord(key, valueIterable); |
| } |
| return kvRecord; |
| } |
| }; |
| } |