blob: b70d6c43607543c5c9371ce3791f75e1ff95684f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.tez.runtime.library.common.sort.impl;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.tez.common.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.library.common.comparator.ProxyComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
import org.apache.tez.runtime.library.utils.LocalProgress;
import org.apache.tez.util.StopWatch;
import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;
@SuppressWarnings({"unchecked", "rawtypes"})
public class PipelinedSorter extends ExternalSorter {
private static final Logger LOG = LoggerFactory.getLogger(PipelinedSorter.class);
* The size of each record in the index file for the map-outputs.
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
private final static int APPROX_HEADER_LENGTH = 150;
private final int partitionBits;
private static final int PARTITION = 0; // partition offset in acct
private static final int KEYSTART = 1; // key offset in acct
private static final int VALSTART = 2; // val offset in acct
private static final int VALLEN = 3; // val len in acct
private static final int NMETA = 4; // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
private final int minSpillsForCombine;
private final ProxyComparator hasher;
// SortSpans
private SortSpan span;
//total memory capacity allocated to sorter
private final long capacity;
//track buffer overflow recursively in all buffers
private int bufferOverflowRecursion;
// Merger
private final SpanMerger merger;
private final ExecutorService sortmaster;
private final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();
private final boolean pipelinedShuffle;
private long currentAllocatableMemory;
//Maintain a list of ByteBuffers
final List<ByteBuffer> buffers;
List<Integer> bufferUsage;
final int maxNumberOfBlocks;
private int bufferIndex = -1;
private final int MIN_BLOCK_SIZE;
private final boolean lazyAllocateMem;
private final Deflater deflater;
private final String auxiliaryService;
* Store the events to be send in close.
private final List<Event> finalEvents;
// TODO Set additional countesr - total bytes written, spills etc.
public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
super(outputContext, conf, numOutputs, initialMemoryAvailable);
lazyAllocateMem = this.conf.getBoolean(TezRuntimeConfiguration
if (lazyAllocateMem) {
* When lazy-allocation is enabled, framework takes care of auto
* allocating memory on need basis. Desirable block size is set to 256MB
//256 MB - 64 bytes. See comment for the 32MB allocation.
MIN_BLOCK_SIZE = ((256 << 20) - 64);
} else {
int minBlockSize = conf.getInt(TezRuntimeConfiguration
(minBlockSize > 0 && minBlockSize < 2047),
+ "=" + minBlockSize + " should be a positive value between 0 and 2047");
MIN_BLOCK_SIZE = minBlockSize << 20;
StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ")
.append(outputContext.getDestinationVertexName()).append(": ");
partitionBits = bitcount(partitions)+1;
boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
//sanity checks
final long sortmb = this.availableMemoryMb;
// buffers and accounting
long maxMemLimit = sortmb << 20;
initialSetupLogLine.append(", UsingHashComparator=");
// k/v serialization
if(comparator instanceof ProxyComparator) {
hasher = (ProxyComparator)comparator;
} else {
hasher = null;
long totalCapacityWithoutMeta = 0;
long availableMem = maxMemLimit;
int numBlocks = 0;
while(availableMem > 0) {
long size = Math.min(availableMem, computeBlockSize(availableMem, maxMemLimit));
int sizeWithoutMeta = (int) ((size) - (size % METASIZE));
totalCapacityWithoutMeta += sizeWithoutMeta;
availableMem -= size;
currentAllocatableMemory = maxMemLimit;
maxNumberOfBlocks = numBlocks;
capacity = totalCapacityWithoutMeta;
buffers = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
bufferUsage = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
allocateSpace(); //Allocate the first block
if (!lazyAllocateMem) {"Pre allocating rest of memory buffers upfront");
while(allocateSpace() != null);
initialSetupLogLine.append(", maxMemUsage=").append(maxMemLimit);
initialSetupLogLine.append(", lazyAllocateMem=").append(
initialSetupLogLine.append(", minBlockSize=").append(MIN_BLOCK_SIZE);
initialSetupLogLine.append(", initial BLOCK_SIZE=").append(buffers.get(0).capacity());
initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled());
initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle);
initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails);
initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append(
Preconditions.checkState(buffers.size() > 0, "Atleast one buffer needs to be present");;
span = new SortSpan(buffers.get(bufferIndex), 1024 * 1024, 16, this.comparator);
merger = new SpanMerger(); // SpanIterators are comparable
final int sortThreads =
sortmaster = Executors.newFixedThreadPool(sortThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Sorter {" + TezUtilsInternal
.cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
deflater = TezCommonUtils.newBestCompressionDeflater();
finalEvents = Lists.newLinkedList();
ByteBuffer allocateSpace() {
if (currentAllocatableMemory <= 0) {
//No space available.
return null;
int size = computeBlockSize(currentAllocatableMemory, availableMemoryMb << 20);
currentAllocatableMemory -= size;
int sizeWithoutMeta = (size) - (size % METASIZE);
ByteBuffer space = ByteBuffer.allocate(sizeWithoutMeta);
Preconditions.checkState(buffers.size() <= maxNumberOfBlocks,
"Number of blocks " + buffers.size()
+ " is exceeding " + maxNumberOfBlocks);"Newly allocated block size=" + size
+ ", index=" + bufferIndex
+ ", Number of buffers=" + buffers.size()
+ ", currentAllocatableMemory=" + currentAllocatableMemory
+ ", currentBufferSize=" + space.capacity()
+ ", total=" + (availableMemoryMb << 20));
return space;
int computeBlockSize(long availableMem, long maxAllocatedMemory) {
int maxBlockSize = 0;
* When lazy-allocation is enabled, framework takes care of auto allocating
* memory on need basis. In such cases, first buffer starts with 32 MB.
if (lazyAllocateMem) {
if (buffers == null || buffers.isEmpty()) {
//32 MB - 64 bytes
// These buffers end up occupying 33554456 (32M + 24) bytes.
// On large JVMs (64G+), with G1Gc - the region size maxes out at
// 32M. Without the -64, this structure would end up using 2 regions.
return ((32 << 20) - 64);
maxBlockSize = Math.max(MIN_BLOCK_SIZE, maxBlockSize);
if (availableMem < maxBlockSize) {
maxBlockSize = (int) availableMem;
int maxMem = (maxAllocatedMemory > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxAllocatedMemory;
if (maxBlockSize > maxMem) {
maxBlockSize = maxMem;
availableMem -= maxBlockSize;
if (availableMem < MIN_BLOCK_SIZE) {
if ((maxBlockSize + availableMem) < Integer.MAX_VALUE) {
//Merge remaining with last block
maxBlockSize += availableMem;
return maxBlockSize;
private int bitcount(int n) {
int bit = 0;
while(n!=0) {
n >>= 1;
return bit;
public void sort() throws IOException {
SortSpan newSpan =;
if(newSpan == null) {
//avoid sort/spill of empty span
StopWatch stopWatch = new StopWatch();
// sort in the same thread, do not wait for the thread pool
boolean ret = spill(true);
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for spill " + ( + " ms");
if (pipelinedShuffle && ret) {
// Use the next buffer
bufferIndex = (bufferIndex + 1) % buffers.size();
bufferUsage.set(bufferIndex, bufferUsage.get(bufferIndex) + 1);
int items = 1024*1024;
int perItem = 16;
if(span.length() != 0) {
items = span.length();
perItem = span.kvbuffer.limit()/items;
items = (int) ((span.capacity)/(METASIZE+perItem));
if(items > 1024*1024) {
// our goal is to have 1M splits and sort early
items = 1024*1024;
Preconditions.checkArgument(buffers.get(bufferIndex) != null, "block should not be empty");
//TODO: fix per item being passed.
span = new SortSpan((ByteBuffer)buffers.get(bufferIndex).clear(), (1024*1024),
perItem, ConfigUtils.getIntermediateOutputKeyComparator(this.conf));
} else {
// queue up the sort
SortTask task = new SortTask(span, sorter);
LOG.debug("Submitting span={} for sort", span.toString());
Future<SpanIterator> future = sortmaster.submit(task);
span = newSpan;
// if pipelined shuffle is enabled, this method is called to send events for every spill
private void sendPipelinedShuffleEvents() throws IOException{
List<Event> events = Lists.newLinkedList();
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1));
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false,
outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1),
partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
reportDetailedPartitionStats(), auxiliaryService, deflater);
outputContext.sendEvents(events); +
": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
public void write(Object key, Object value)
throws IOException {
key, value, partitioner.getPartition(key, value, partitions));
* Serialize the key, value to intermediate storage.
* When this method returns, kvindex must refer to sufficient unused
* storage to store one METADATA.
synchronized void collect(Object key, Object value, final int partition
) throws IOException {
if (key.getClass() != serializationContext.getKeyClass()) {
throw new IOException("Type mismatch in key from map: expected "
+ serializationContext.getKeyClass().getName() + ", received "
+ key.getClass().getName());
if (value.getClass() != serializationContext.getValueClass()) {
throw new IOException("Type mismatch in value from map: expected "
+ serializationContext.getValueClass().getName() + ", received "
+ value.getClass().getName());
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
// TBD:FIX in TEZ-2574
if (span.kvmeta.remaining() < METASIZE) {
if (span.length() == 0) {
spillSingleRecord(key, value, partition);
int keystart = span.kvbuffer.position();
int valstart = -1;
int valend = -1;
try {
valstart = span.kvbuffer.position();
valend = span.kvbuffer.position();
} catch(BufferOverflowException overflow) {
// restore limit
if (span.length() == 0 || bufferOverflowRecursion > buffers.size()) {
// spill the current key value pair
spillSingleRecord(key, value, partition);
bufferOverflowRecursion = 0;
// try again
this.collect(key, value, partition);
if (bufferOverflowRecursion > 0) {
int prefix = 0;
if(hasher != null) {
prefix = hasher.getProxy(key);
prefix = (partition << (32 - partitionBits)) | (prefix >>> partitionBits);
/* maintain order as in PARTITION, KEYSTART, VALSTART, VALLEN */
span.kvmeta.put(valend - valstart);
mapOutputByteCounter.increment(valend - keystart);
private void adjustSpillCounters(long rawLength, long compLength) {
if (!isFinalMergeEnabled()) {
} else {
if (numSpills > 0) {
// Reset the value will be set during the final merge.
} else {
// Set this up for the first write only. Subsequent ones will be handled in the final merge.
// it is guaranteed that when spillSingleRecord is called, there is
// no merger spans queued in executor.
private void spillSingleRecord(final Object key, final Object value,
int partition) throws IOException {
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
// getSpillFileForWrite with size -1 as the serialized size of KV pair is still unknown
final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1);
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
spillFilePaths.put(numSpills, filename);
FSDataOutputStream out = rfs.create(filename, true, 4096);
ensureSpillFilePermissions(filename, rfs);
try { + ": Spilling to " + filename.toString() +
", indexFilename=" + indexFilename);
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
Writer writer = null;
try {
long segmentStart = out.getPos();
if (!sendEmptyPartitionDetails || (i == partition)) {
writer = new Writer(serializationContext.getKeySerialization(),
serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
serializationContext.getValueClass(), codec, spilledRecordsCounter, null, false);
// we need not check for combiner since its a single record
if (i == partition) {
final long recordStart = out.getPos();
writer.append(key, value);
mapOutputByteCounter.increment(out.getPos() - recordStart);
long rawLength = 0;
long partLength = 0;
if (writer != null) {
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
adjustSpillCounters(rawLength, partLength);
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) {
spillFileIndexPaths.put(numSpills, indexFilename);
spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
if (!isFinalMergeEnabled()) {
//No final merge. Set the number of files offered via shuffle-handler
if (pipelinedShuffle) {
} finally {
public boolean spill(boolean ignoreEmptySpills) throws IOException {
FSDataOutputStream out = null;
try {
try {
boolean ret = merger.ready();
// if merger returned false and ignore merge is true,
// then return directly without spilling
if (!ret && ignoreEmptySpills){
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); + ": Interrupted while waiting for mergers to complete");
throw new IOInterruptedException(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete", e);
// create spill file
final long size = capacity +
+ (partitions * APPROX_HEADER_LENGTH);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename, true, 4096);
ensureSpillFilePermissions(filename, rfs); + ": Spilling to " + filename.toString());
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
return false;
TezRawKeyValueIterator kvIter = merger.filter(i);
//write merged output to disk
long segmentStart = out.getPos();
Writer writer = null;
boolean hasNext = kvIter.hasNext();
if (hasNext || !sendEmptyPartitionDetails) {
writer = new Writer(serializationContext.getKeySerialization(),
serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
serializationContext.getValueClass(), codec, spilledRecordsCounter, null,
if (combiner == null) {
while ( {
writer.append(kvIter.getKey(), kvIter.getValue());
} else {
if (hasNext) {
runCombineProcessor(kvIter, writer);
long rawLength = 0;
long partLength = 0;
if (writer != null) {
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
adjustSpillCounters(rawLength, partLength);
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats()) {
partitionStats[i] += partLength;
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
spillFileIndexPaths.put(numSpills, indexFilename);
spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
if (!isFinalMergeEnabled()) {
//No final merge. Set the number of files offered via shuffle-handler
return true;
} finally {
if (out != null) {
private boolean isThreadInterrupted() throws IOException {
if (Thread.currentThread().isInterrupted()) {
if (cleanup) {
sortmaster.shutdownNow(); + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster
.isShutdown() + ", terminated=" + sortmaster.isTerminated());
return true;
return false;
public void flush() throws IOException {
final String uniqueIdentifier = outputContext.getUniqueIdentifier();
* Possible that the thread got interrupted when flush was happening or when the flush was
* never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
* on all I/O. At that time, this is safe to cleanup
if (isThreadInterrupted()) {
try { + ": Starting flush of map output");
// force a spill in flush()
// case 1: we want to force because of following scenarios:
// we have no keys written, and flush got called
// we want atleast one spill(be it empty)
// case 2: in pipeline shuffle case, we have no way of
// knowing the last key being written until flush is called
// so for flush()->spill() we want to force spill so that
// we can send pipeline shuffle event with last event true.
//safe to clean up
if(indexCacheList.isEmpty()) {
* If we do not have this check, and if the task gets killed in the middle, it can throw
* NPE leading to distraction when debugging.
if (LOG.isDebugEnabled()) {
+ ": Index list is empty... returning");
if (!isFinalMergeEnabled()) {
//For pipelined shuffle, previous events are already sent. Just generate the last event alone
int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0;
int endIndex = numSpills;
for (int i = startIndex; i < endIndex; i++) {
boolean isLastEvent = (i == numSpills - 1);
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
ShuffleUtils.generateEventOnSpill(finalEvents, isFinalMergeEnabled(), isLastEvent,
outputContext, i, indexCacheList.get(i), partitions,
sendEmptyPartitionDetails, pathComponent, partitionStats,
reportDetailedPartitionStats(), auxiliaryService, deflater); + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
numAdditionalSpills.increment(numSpills - 1);
//In case final merge is required, the following code path is executed.
if (numSpills == 1) {
// someday be able to pass this directly to shuffle
// without writing to disk
final Path filename = spillFilePaths.get(0);
final Path indexFilename = spillFileIndexPaths.get(0);
finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename);
finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
sameVolRename(filename, finalOutputFile);
sameVolRename(indexFilename, finalIndexFile);
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getDestinationVertexName() + ": numSpills=" + numSpills +
", finalOutputFile=" + finalOutputFile + ", "
+ "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, localFs);
if (reportPartitionStats()) {
for (int i = 0; i < spillRecord.size(); i++) {
partitionStats[i] += spillRecord.getIndex(i).getPartLength();
// ??? why are events not being sent here?
finalOutputFile =
mapOutputFile.getOutputFileForWrite(0); //TODO
finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(0); //TODO
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getDestinationVertexName() + ": " +
"numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
+ finalIndexFile);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
ensureSpillFilePermissions(finalOutputFile, rfs);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) {
boolean shouldWrite = false;
//create the segments to be merged
List<Segment> segmentList =
new ArrayList<Segment>(numSpills);
for (int i = 0; i < numSpills; i++) {
Path spillFilename = spillFilePaths.get(i);
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
if (indexRecord.hasData() || !sendEmptyPartitionDetails) {
shouldWrite = true;
DiskSegment s =
new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(),
indexRecord.getPartLength(), codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, true);
int mergeFactor =
// sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
serializationContext, codec,
segmentList, mergeFactor,
new Path(uniqueIdentifier),
(RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
progressable, sortSegments, true,
null, spilledRecordsCounter, additionalSpillBytesRead,
null, merger.needsRLE()); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
long segmentStart = finalOut.getPos();
long rawLength = 0;
long partLength = 0;
if (shouldWrite) {
Writer writer = new Writer(serializationContext.getKeySerialization(),
serializationContext.getValSerialization(), finalOut,
serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
spilledRecordsCounter, null, merger.needsRLE());
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer, progressable,
} else {
runCombineProcessor(kvIter, writer);
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, parts);
if (reportPartitionStats()) {
partitionStats[parts] += partLength;
numShuffleChunks.setValue(1); //final merge has happened.
spillRec.writeToFile(finalIndexFile, conf, localFs);
for (int i = 0; i < numSpills; i++) {
Path indexFilename = spillFileIndexPaths.get(i);
Path spillFilename = spillFilePaths.get(i);
rfs.delete(indexFilename, true);
rfs.delete(spillFilename, true);
} catch(InterruptedException ie) {
if (cleanup) {
throw new IOInterruptedException("Interrupted while closing Output", ie);
* Close and send events.
* @return events to be returned by the edge.
* @throws IOException parent can throw this.
public final List<Event> close() throws IOException {
return finalEvents;
private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
int getPartition();
Integer peekPartition();
private static class BufferStreamWrapper extends OutputStream
private final ByteBuffer out;
public BufferStreamWrapper(ByteBuffer out) {
this.out = out;
public void write(int b) throws IOException { out.put((byte)b); }
public void write(byte[] b) throws IOException { out.put(b); }
public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
private static final class InputByteBuffer extends DataInputBuffer {
private byte[] buffer = new byte[256];
private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
private void resize(int length) {
if(length > buffer.length || (buffer.length > 10 * (1+length))) {
// scale down as well as scale up across values
buffer = new byte[length];
wrapped = ByteBuffer.wrap(buffer);
// shallow copy
public void reset(DataInputBuffer clone) {
byte[] data = clone.getData();
int start = clone.getPosition();
int length = clone.getLength() - start;
super.reset(data, start, length);
// deep copy
public void copy(DataInputBuffer clone) {
byte[] data = clone.getData();
int start = clone.getPosition();
int length = clone.getLength() - start;
System.arraycopy(data, start, buffer, 0, length);
super.reset(buffer, 0, length);
private final class SortSpan implements IndexedSortable {
final IntBuffer kvmeta;
final byte[] rawkvmeta;
final int kvmetabase;
final ByteBuffer kvbuffer;
final NonSyncDataOutputStream out;
final RawComparator comparator;
final byte[] imeta = new byte[METASIZE];
private int index = 0;
private long eq = 0;
private boolean reinit = false;
private int capacity;
public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comparator) {
capacity = source.remaining();
int metasize = METASIZE*maxItems;
int dataSize = maxItems * perItem;
if(capacity < (metasize+dataSize)) {
// try to allocate less meta space, because we have sample data
metasize = METASIZE*(capacity/(perItem+METASIZE));
ByteBuffer reserved = source.duplicate();
reserved.mark(); + ": " + "reserved.remaining()=" +
reserved.remaining() + ", reserved.metasize=" + metasize);
kvbuffer = reserved.slice();
ByteBuffer kvmetabuffer = reserved.slice();
rawkvmeta = kvmetabuffer.array();
kvmetabase = kvmetabuffer.arrayOffset();
kvmeta = kvmetabuffer
out = new NonSyncDataOutputStream(
new BufferStreamWrapper(kvbuffer));
this.comparator = comparator;
public SpanIterator sort(IndexedSorter sorter) {
long start = System.currentTimeMillis();
if(length() > 1) {
sorter.sort(this, 0, length(), progressable);
} + ": " + "done sorting span=" + index + ", length=" + length() + ", "
+ "time=" + (System.currentTimeMillis() - start));
return new SpanIterator((SortSpan)this);
int offsetFor(int i) {
return (i * NMETA);
public void swap(final int mi, final int mj) {
final int kvi = offsetFor(mi);
final int kvj = offsetFor(mj);
final int kvioff = kvmetabase + (kvi << 2);
final int kvjoff = kvmetabase + (kvj << 2);
System.arraycopy(rawkvmeta, kvioff, imeta, 0, METASIZE);
System.arraycopy(rawkvmeta, kvjoff, rawkvmeta, kvioff, METASIZE);
System.arraycopy(imeta, 0, rawkvmeta, kvjoff, METASIZE);
protected int compareKeys(final int kvi, final int kvj) {
final int istart = kvmeta.get(kvi + KEYSTART);
final int jstart = kvmeta.get(kvj + KEYSTART);
final int ilen = kvmeta.get(kvi + VALSTART) - istart;
final int jlen = kvmeta.get(kvj + VALSTART) - jstart;
if (ilen == 0 || jlen == 0) {
if (ilen == jlen) {
return ilen - jlen;
final byte[] buf = kvbuffer.array();
final int off = kvbuffer.arrayOffset();
// sort by key
final int cmp =, off + istart, ilen, buf, off + jstart, jlen);
if(cmp == 0) eq++;
return cmp;
public int compare(final int mi, final int mj) {
final int kvi = offsetFor(mi);
final int kvj = offsetFor(mj);
final int kvip = kvmeta.get(kvi + PARTITION);
final int kvjp = kvmeta.get(kvj + PARTITION);
// sort by partition
if (kvip != kvjp) {
return kvip - kvjp;
return compareKeys(kvi, kvj);
public SortSpan next() {
ByteBuffer remaining = end();
if(remaining != null) {
SortSpan newSpan = null;
int items = length();
int perItem = kvbuffer.position()/items;
if (reinit) { //next mem block
//quite possible that the previous span had a length of 1. It is better to reinit here for new span.
items = 1024*1024;
perItem = 16;
final RawComparator newComparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
if (this.comparator == newComparator) {
LOG.warn("Same comparator used. comparator={}, newComparator={},"
+ " hashCode: comparator={}, newComparator={}",
this.comparator, newComparator,
newSpan = new SortSpan(remaining, items, perItem, newComparator);
newSpan.index = index+1; + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan
.length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
return newSpan;
return null;
public int length() {
return kvmeta.limit()/NMETA;
public ByteBuffer end() {
ByteBuffer remaining = kvbuffer.duplicate();
remaining = remaining.slice();
int items = length();
if(items == 0) {
return null;
int perItem = kvbuffer.position()/items; + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
if(remaining.remaining() < METASIZE+perItem) {
//Check if we can get the next Buffer from the main buffer list
ByteBuffer space = allocateSpace();
if (space != null) { + ": " + "Getting memory from next block in the list, recordsWritten=" +
reinit = true;
return space;
return null;
return remaining;
public int compareInternal(final DataInputBuffer needle, final int needlePart, final int index) {
int cmp = 0;
final int keystart;
final int valstart;
final int partition;
partition = kvmeta.get(this.offsetFor(index) + PARTITION);
if(partition != needlePart) {
cmp = (partition-needlePart);
} else {
keystart = kvmeta.get(this.offsetFor(index) + KEYSTART);
valstart = kvmeta.get(this.offsetFor(index) + VALSTART);
final byte[] buf = kvbuffer.array();
final int off = kvbuffer.arrayOffset();
cmp =,
keystart + off , (valstart - keystart),
needle.getPosition(), (needle.getLength() - needle.getPosition()));
return cmp;
public long getEq() {
return eq;
public String toString() {
return String.format("Span[%d,%d]", NMETA*kvmeta.capacity(), kvbuffer.limit());
private static class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
private int kvindex = -1;
private final int maxindex;
private final IntBuffer kvmeta;
private final ByteBuffer kvbuffer;
private final SortSpan span;
private final InputByteBuffer key = new InputByteBuffer();
private final InputByteBuffer value = new InputByteBuffer();
private final Progress progress = new LocalProgress();
private static final int minrun = (1 << 4);
public SpanIterator(SortSpan span) {
this.kvmeta = span.kvmeta;
this.kvbuffer = span.kvbuffer;
this.span = span;
this.maxindex = (kvmeta.limit()/NMETA) - 1;
public DataInputBuffer getKey() {
final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
final byte[] buf = kvbuffer.array();
final int off = kvbuffer.arrayOffset();
key.reset(buf, off + keystart, valstart - keystart);
return key;
public DataInputBuffer getValue() {
final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
final byte[] buf = kvbuffer.array();
final int off = kvbuffer.arrayOffset();
value.reset(buf, off + valstart, vallen);
return value;
public boolean next() {
// caveat: since we use this as a comparable in the merger
if(kvindex == maxindex) return false;
kvindex += 1;
if(kvindex % 100 == 0) {
progress.set(1 - ((maxindex - kvindex) / (float) maxindex));
return true;
public boolean hasNext() {
return (kvindex == maxindex);
public void close() {
public Progress getProgress() {
return progress;
public boolean isSameKey() throws IOException {
return false;
public int getPartition() {
final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
return partition;
public Integer peekPartition() {
if (!hasNext()) {
return null;
} else {
return kvmeta.get(span.offsetFor(kvindex + 1) + PARTITION);
public int size() {
return (maxindex - kvindex);
public int compareTo(SpanIterator other) {
return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
public String toString() {
return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
* bisect returns the next insertion point for a given raw key, skipping keys
* which are <= needle using a binary search instead of a linear comparison.
* This is massively efficient when long strings of identical keys occur.
* @param needle
* @param needlePart
* @return
int bisect(DataInputBuffer needle, int needlePart) {
int start = kvindex;
int end = maxindex-1;
int mid = start;
int cmp = 0;
if(end - start < minrun) {
return 0;
if(span.compareInternal(needle, needlePart, start) > 0) {
return kvindex;
// bail out early if we haven't got a min run
if(span.compareInternal(needle, needlePart, start+minrun) > 0) {
return 0;
if(span.compareInternal(needle, needlePart, end) < 0) {
return end - kvindex;
boolean found = false;
// we sort 100k items, the max it can do is 20 loops, but break early
for(int i = 0; start < end && i < 16; i++) {
mid = start + (end - start)/2;
cmp = span.compareInternal(needle, needlePart, mid);
if(cmp == 0) {
start = mid;
found = true;
} else if(cmp < 0) {
start = mid;
found = true;
if(cmp > 0) {
end = mid;
if(found) {
return start - kvindex;
return 0;
private static class SortTask extends CallableWithNdc<SpanIterator> {
private final SortSpan sortable;
private final IndexedSorter sorter;
public SortTask(SortSpan sortable, IndexedSorter sorter) {
this.sortable = sortable;
this.sorter = sorter;
protected SpanIterator callInternal() {
return sortable.sort(sorter);
private class PartitionFilter implements TezRawKeyValueIterator {
private final PartitionedRawKeyValueIterator iter;
private int partition;
private boolean dirty = false;
public PartitionFilter(PartitionedRawKeyValueIterator iter) {
this.iter = iter;
public DataInputBuffer getKey() throws IOException { return iter.getKey(); }
public DataInputBuffer getValue() throws IOException { return iter.getValue(); }
public void close() throws IOException { }
public Progress getProgress() {
return new Progress();
public boolean isSameKey() throws IOException {
return iter.isSameKey();
public boolean next() throws IOException {
if(dirty || {
int prefix = iter.getPartition();
if((prefix >>> (32 - partitionBits)) == partition) {
dirty = false; // we found what we were looking for, good
return true;
} else if(!dirty) {
dirty = true; // we did a lookahead and failed to find partition
return false;
public boolean hasNext() throws IOException {
if (dirty || iter.hasNext()) {
Integer part;
if (dirty) {
part = iter.getPartition();
} else {
part = iter.peekPartition();
if (part != null) {
return (part >>> (32 - partitionBits)) == partition;
return false;
public void reset(int partition) {
this.partition = partition;
public int getPartition() {
return this.partition;
private static class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
private static final long serialVersionUID = 1L;
public SpanHeap() {
* {@link PriorityQueue}.poll() by a different name
* @return
public SpanIterator pop() {
return this.poll();
public boolean needsRLE() {
return merger.needsRLE();
private final class SpanMerger implements PartitionedRawKeyValueIterator {
InputByteBuffer key = new InputByteBuffer();
InputByteBuffer value = new InputByteBuffer();
int partition;
private ArrayList< Future<SpanIterator>> futures = new ArrayList< Future<SpanIterator>>();
private SpanHeap heap = new SpanHeap();
private PartitionFilter partIter;
private int gallop = 0;
private SpanIterator horse;
private long total = 0;
private long eq = 0;
public SpanMerger() {
// SpanIterators are comparable
partIter = new PartitionFilter(this);
public final void add(SpanIterator iter) {
if( {
public final void add(Future<SpanIterator> iter) {
public final boolean ready() throws IOException, InterruptedException {
int numSpanItr = futures.size();
try {
SpanIterator iter = null;
while(this.futures.size() > 0) {
Future<SpanIterator> futureIter = this.futures.remove(0);
iter = futureIter.get();
StringBuilder sb = new StringBuilder();
if (heap.size() == 0) {
return false;
for(SpanIterator sp: heap) {
total += sp.span.length();
eq += sp.span.getEq();
} + ": " + "Heap = " + sb.toString());
return true;
} catch(ExecutionException e) {
LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={},"
+ " futures.size={}, destVertexName={}",
heap.size(), total, eq, partition, gallop, numSpanItr, futures.size(),
outputContext.getDestinationVertexName(), e);
throw new IOException(e);
private SpanIterator pop() {
if(gallop > 0) {
return horse;
SpanIterator current = heap.pop();
SpanIterator next = heap.peek();
if(next != null && current != null &&
((Object)horse) == ((Object)current)) {
// TODO: a better threshold check than 1 key repeating
gallop = current.bisect(next.getKey(), next.getPartition())-1;
horse = current;
return current;
public boolean needsRLE() {
return (eq > 0.1 * total);
private SpanIterator peek() {
if (gallop > 0) {
return horse;
return heap.peek();
public final boolean next() {
SpanIterator current = pop();
if(current != null) {
partition = current.getPartition();
if(gallop <= 0) {
// since all keys and values are references to the kvbuffer, no more deep copies
} else {
// galloping, no deep copies required anyway;
return true;
return false;
public boolean hasNext() {
return peek() != null;
public Integer peekPartition() {
if (!hasNext()) {
return null;
} else {
SpanIterator peek = peek();
return peek.getPartition();
public DataInputBuffer getKey() { return key; }
public DataInputBuffer getValue() { return value; }
public int getPartition() { return partition; }
public void close() throws IOException {
public Progress getProgress() {
return new Progress();
public boolean isSameKey() throws IOException {
return false;
public TezRawKeyValueIterator filter(int partition) {
return partIter;