blob: c362d98845e3fb80f93e027c48dbbc6519150c26 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.sort.impl;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
@SuppressWarnings({"unchecked", "rawtypes"})
public abstract class ExternalSorter {
private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
public abstract void close() throws IOException;
public abstract void flush() throws IOException;
public abstract void write(Object key, Object value) throws IOException;
protected Progressable nullProgressable = new NullProgressable();
protected TezOutputContext outputContext;
protected Combiner combiner;
protected Partitioner partitioner;
protected Configuration conf;
protected FileSystem rfs;
protected TezTaskOutput mapOutputFile;
protected int partitions;
protected Class keyClass;
protected Class valClass;
protected RawComparator comparator;
protected SerializationFactory serializationFactory;
protected Serializer keySerializer;
protected Serializer valSerializer;
protected IndexedSorter sorter;
// Compression for map-outputs
protected CompressionCodec codec;
// Counters
// TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
protected TezCounter mapOutputByteCounter;
protected TezCounter mapOutputRecordCounter;
protected TezCounter fileOutputByteCounter;
protected TezCounter spilledRecordsCounter;
public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
this.outputContext = outputContext;
this.conf = conf;
this.partitions = numOutputs;
rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
// sorter
sorter = ReflectionUtils.newInstance(this.conf.getClass(
TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, QuickSort.class,
IndexedSorter.class), this.conf);
comparator = ConfigUtils.getIntermediateOutputKeyComparator(this.conf);
// k/v serialization
keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
serializationFactory = new SerializationFactory(this.conf);
keySerializer = serializationFactory.getSerializer(keyClass);
valSerializer = serializationFactory.getSerializer(valClass);
// counters
mapOutputByteCounter =
outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter =
outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter =
outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
spilledRecordsCounter =
outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
// compression
if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
Class<? extends CompressionCodec> codecClass =
ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, this.conf);
} else {
codec = null;
}
// Task outputs
mapOutputFile = TezRuntimeUtils.instantiateTaskOutputManager(conf, outputContext);
LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS) + "]");
this.conf.setInt(TezJobConfig.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.partitions);
this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
}
/**
* Exception indicating that the allocated sort buffer is insufficient to hold
* the current record.
*/
@SuppressWarnings("serial")
public static class MapBufferTooSmallException extends IOException {
public MapBufferTooSmallException(String s) {
super(s);
}
}
@Private
public TezTaskOutput getMapOutput() {
return mapOutputFile;
}
protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
Writer writer) throws IOException {
try {
combiner.combine(kvIter, writer);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
/**
* Rename srcPath to dstPath on the same volume. This is the same as
* RawLocalFileSystem's rename method, except that it will not fall back to a
* copy, and it will create the target directory if it doesn't exist.
*/
protected void sameVolRename(Path srcPath, Path dstPath) throws IOException {
RawLocalFileSystem rfs = (RawLocalFileSystem) this.rfs;
File src = rfs.pathToFile(srcPath);
File dst = rfs.pathToFile(dstPath);
if (!dst.getParentFile().exists()) {
if (!dst.getParentFile().mkdirs()) {
throw new IOException("Unable to rename " + src + " to " + dst
+ ": couldn't create parent directory");
}
}
if (!src.renameTo(dst)) {
throw new IOException("Unable to rename " + src + " to " + dst);
}
}
public InputStream getSortedStream(int partition) {
throw new UnsupportedOperationException("getSortedStream isn't supported!");
}
public ShuffleHeader getShuffleHeader(int reduce) {
throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
}
}