blob: 7c678749b2e510136e945f4a655867e265757ce5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.sort.impl.dflt;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.Deflater;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
import org.apache.tez.common.Preconditions;
import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;
@SuppressWarnings({"unchecked", "rawtypes"})
public final class DefaultSorter extends ExternalSorter implements IndexedSortable {
private static final Logger LOG = LoggerFactory.getLogger(DefaultSorter.class);
// TODO NEWTEZ Progress reporting to Tez framework. (making progress vs %complete)
/**
* The size of each record in the index file for the map-outputs.
*/
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
private final static int APPROX_HEADER_LENGTH = 150;
// k/v accounting
private IntBuffer kvmeta; // metadata overlay on backing store
int kvstart; // marks origin of spill metadata
int kvend; // marks end of spill metadata
int kvindex; // marks end of fully serialized records
int equator; // marks origin of meta/serialization
int bufstart; // marks beginning of spill
int bufend; // marks beginning of collectable
int bufmark; // marks end of record
int bufindex; // marks end of collected
int bufvoid; // marks the point where we should stop
// reading at the end of the buffer
private byte[] kvbuffer; // main output buffer
private final byte[] b0 = new byte[0];
protected static final int VALSTART = 0; // val offset in acct
protected static final int KEYSTART = 1; // key offset in acct
protected static final int PARTITION = 2; // partition offset in acct
protected static final int VALLEN = 3; // length of value
protected static final int NMETA = 4; // num meta ints
protected static final int METASIZE = NMETA * 4; // size in bytes
// spill accounting
final int maxRec;
final int softLimit;
boolean spillInProgress;
int bufferRemaining;
volatile Throwable sortSpillException = null;
final int minSpillsForCombine;
final ReentrantLock spillLock = new ReentrantLock();
final Condition spillDone = spillLock.newCondition();
final Condition spillReady = spillLock.newCondition();
final BlockingBuffer bb = new BlockingBuffer();
volatile boolean spillThreadRunning = false;
final SpillThread spillThread = new SpillThread();
private final Deflater deflater;
private final String auxiliaryService;
final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();
private final int indexCacheMemoryLimit;
private int totalIndexCacheMemory;
private long totalKeys = 0;
private long sameKey = 0;
public static final int MAX_IO_SORT_MB = 1800;
public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
super(outputContext, conf, numOutputs, initialMemoryAvailable);
deflater = TezCommonUtils.newBestCompressionDeflater();
// sanity checks
final float spillper = this.conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT);
final int sortmb = computeSortBufferSize((int) availableMemoryMb, outputContext.getDestinationVertexName());
Preconditions.checkArgument(spillper <= (float) 1.0 && spillper > (float) 0.0,
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT
+ " should be greater than 0 and less than or equal to 1");
indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT);
boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
if (confPipelinedShuffle) {
LOG.warn(outputContext.getInputOutputVertexNames() + ": " +
TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " does not work "
+ "with DefaultSorter. It is supported only with PipelinedSorter.");
}
auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
// buffers and accounting
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
kvbuffer = new byte[maxMemUsage];
bufvoid = kvbuffer.length;
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
maxRec = kvmeta.capacity() / NMETA;
softLimit = (int)(kvbuffer.length * spillper);
bufferRemaining = softLimit;
if (LOG.isInfoEnabled()) {
LOG.info(outputContext.getDestinationVertexName() + ": "
+ TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "=" + sortmb
+ ", soft limit=" + softLimit
+ ", bufstart=" + bufstart
+ ", bufvoid=" + bufvoid
+ ", kvstart=" + kvstart
+ ", legnth=" + maxRec
+ ", finalMergeEnabled=" + isFinalMergeEnabled());
}
// k/v serialization
valSerializer.open(bb);
keySerializer.open(bb);
spillInProgress = false;
minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
spillThread.setDaemon(true);
spillThread.setName("SpillThread {"
+ TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName() + "}"));
spillLock.lock();
try {
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
//interrupt spill thread
spillThread.interrupt();
Thread.currentThread().interrupt();
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw new IOException("Spill thread failed to initialize",
sortSpillException);
}
}
@VisibleForTesting
static int computeSortBufferSize(int availableMemoryMB, String logContext) {
if (availableMemoryMB <= 0) {
throw new RuntimeException(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
"=" + availableMemoryMB + ". It should be > 0");
}
if (availableMemoryMB > MAX_IO_SORT_MB) {
LOG.warn(logContext + ": Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
"=" + availableMemoryMB + " to " + MAX_IO_SORT_MB
+ " (max sort buffer size supported forDefaultSorter)");
}
// cap sort buffer to MAX_IO_SORT_MB for DefaultSorter.
// Not using 2047 to avoid any ArrayIndexOutofBounds in collect() phase.
return Math.min(MAX_IO_SORT_MB, availableMemoryMB);
}
@Override
public void write(Object key, Object value)
throws IOException {
collect(
key, value, partitioner.getPartition(key, value, partitions));
}
/**
* Serialize the key, value to intermediate storage.
* When this method returns, kvindex must refer to sufficient unused
* storage to store one METADATA.
*/
synchronized void collect(Object key, Object value, final int partition
) throws IOException {
if (key.getClass() != serializationContext.getKeyClass()) {
throw new IOException("Type mismatch in key from map: expected "
+ serializationContext.getKeyClass().getName() + ", received "
+ key.getClass().getName());
}
if (value.getClass() != serializationContext.getValueClass()) {
throw new IOException("Type mismatch in value from map: expected "
+ serializationContext.getValueClass().getName() + ", received "
+ value.getClass().getName());
}
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")" + ", TotalPartitions: " + partitions);
}
checkSpillException();
bufferRemaining -= METASIZE;
if (bufferRemaining <= 0) {
// start spill if the thread is not running and the soft limit has been
// reached
spillLock.lock();
try {
do {
if (!spillInProgress) {
final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
// serialized, unspilled bytes always lie between kvindex and
// bufindex, crossing the equator. Note that any void space
// created by a reset must be included in "used" bytes
final int bUsed = distanceTo(kvbidx, bufindex);
final boolean bufsoftlimit = bUsed >= softLimit;
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
resetSpill();
bufferRemaining = Math.min(
distanceTo(bufindex, kvbidx) - 2 * METASIZE,
softLimit - bUsed) - METASIZE;
continue;
} else if (bufsoftlimit && kvindex != kvend) {
// spill records, if any collected; check latter, as it may
// be possible for metadata alignment to hit spill pcnt
startSpill();
final int avgRec = (int)
(mapOutputByteCounter.getValue() /
mapOutputRecordCounter.getValue());
// leave at least half the split buffer for serialization data
// ensure that kvindex >= bufindex
final int distkvi = distanceTo(bufindex, kvbidx);
/**
* Reason for capping sort buffer to MAX_IO_SORT_MB
* E.g
* kvbuffer.length = 2146435072 (2047 MB)
* Corner case: bufIndex=2026133899, kvbidx=523629312.
* distkvi = mod - i + j = 2146435072 - 2026133899 + 523629312 = 643930485
* newPos = (2026133899 + (max(.., min(643930485/2, 271128624))) (This would
* overflow)
*/
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
distkvi / (METASIZE + avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
// bytes remaining before the lock must be held and limits
// checked is the minimum of three arcs: the metadata space, the
// serialization space, and the soft limit
bufferRemaining = Math.min(
// metadata max
distanceTo(bufend, newPos),
Math.min(
// serialization max
distanceTo(newPos, serBound),
// soft limit
softLimit)) - 2 * METASIZE;
}
}
} while (false);
} finally {
spillLock.unlock();
}
}
try {
// serialize key bytes into buffer
int keystart = bufindex;
keySerializer.serialize(key);
if (bufindex < keystart) {
// wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
// serialize value bytes into buffer
final int valstart = bufindex;
valSerializer.serialize(value);
// It's possible for records to have zero length, i.e. the serializer
// will perform no writes. To ensure that the boundary conditions are
// checked and that the kvindex invariant is maintained, perform a
// zero-length write into the buffer. The logic monitoring this could be
// moved into collect, but this is cleaner and inexpensive. For now, it
// is acceptable.
bb.write(b0, 0, 0);
// the record must be marked after the preceding write, as the metadata
// for this record are not yet written
int valend = bb.markRecord();
mapOutputRecordCounter.increment(1);
outputContext.notifyProgress();
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));
// write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// advance kvindex
kvindex = (int)(((long)kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity());
totalKeys++;
} catch (MapBufferTooSmallException e) {
LOG.info(
outputContext.getInputOutputVertexNames() + ": Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
}
/**
* Set the point from which meta and serialization data expand. The meta
* indices are aligned with the buffer, so metadata never spans the ends of
* the circular buffer.
*/
private void setEquator(int pos) {
equator = pos;
// set index prior to first entry, aligned at meta boundary
final int aligned = pos - (pos % METASIZE);
// Cast one of the operands to long to avoid integer overflow
kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
if (LOG.isInfoEnabled()) {
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "(EQUATOR) " + pos + " kvi " + kvindex +
"(" + (kvindex * 4) + ")");
}
}
/**
* The spill is complete, so set the buffer and meta indices to be equal to
* the new equator to free space for continuing collection. Note that when
* kvindex == kvend == kvstart, the buffer is empty.
*/
private void resetSpill() {
final int e = equator;
bufstart = bufend = e;
final int aligned = e - (e % METASIZE);
// set start/end to point to first meta record
// Cast one of the operands to long to avoid integer overflow
kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
if (LOG.isInfoEnabled()) {
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "(RESET) equator " + e + " kv " + kvstart + "(" +
(kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
}
}
/**
* Compute the distance in bytes between two indices in the serialization
* buffer.
* @see #distanceTo(int,int,int)
*/
final int distanceTo(final int i, final int j) {
return distanceTo(i, j, kvbuffer.length);
}
/**
* Compute the distance between two indices in the circular buffer given the
* max distance.
*/
int distanceTo(final int i, final int j, final int mod) {
return i <= j
? j - i
: mod - i + j;
}
/**
* For the given meta position, return the offset into the int-sized
* kvmeta buffer.
*/
int offsetFor(int metapos) {
return (metapos % maxRec) * NMETA;
}
/**
* Compare logical range, st i, j MOD offset capacity.
* Compare by partition, then by key.
* @see IndexedSortable#compare
*/
public int compare(final int mi, final int mj) {
final int kvi = offsetFor(mi);
final int kvj = offsetFor(mj);
final int kvip = kvmeta.get(kvi + PARTITION);
final int kvjp = kvmeta.get(kvj + PARTITION);
// sort by partition
if (kvip != kvjp) {
return kvip - kvjp;
}
// sort by key
int result = comparator.compare(kvbuffer,
kvmeta.get(kvi + KEYSTART),
kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
kvbuffer,
kvmeta.get(kvj + KEYSTART),
kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
if (result == 0) {
sameKey++;
}
return result;
}
final byte META_BUFFER_TMP[] = new byte[METASIZE];
/**
* Swap metadata for items i,j
* @see IndexedSortable#swap
*/
public void swap(final int mi, final int mj) {
int iOff = (mi % maxRec) * METASIZE;
int jOff = (mj % maxRec) * METASIZE;
System.arraycopy(kvbuffer, iOff, META_BUFFER_TMP, 0, METASIZE);
System.arraycopy(kvbuffer, jOff, kvbuffer, iOff, METASIZE);
System.arraycopy(META_BUFFER_TMP, 0, kvbuffer, jOff, METASIZE);
}
/**
* Inner class managing the spill of serialized records to disk.
*/
protected class BlockingBuffer extends NonSyncDataOutputStream {
public BlockingBuffer() {
super(new Buffer());
}
/**
* Mark end of record. Note that this is required if the buffer is to
* cut the spill in the proper place.
*/
public int markRecord() {
bufmark = bufindex;
return bufindex;
}
/**
* Set position from last mark to end of writable buffer, then rewrite
* the data between last mark and kvindex.
* This handles a special case where the key wraps around the buffer.
* If the key is to be passed to a RawComparator, then it must be
* contiguous in the buffer. This recopies the data in the buffer back
* into itself, but starting at the beginning of the buffer. Note that
* this method should <b>only</b> be called immediately after detecting
* this condition. To call it at any other time is undefined and would
* likely result in data loss or corruption.
* @see #markRecord()
*/
protected void shiftBufferedKey() throws IOException {
// spillLock unnecessary; both kvend and kvindex are current
int headbytelen = bufvoid - bufmark;
bufvoid = bufmark;
final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
final int avail =
Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
if (bufindex + headbytelen < avail) {
System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
bufindex += headbytelen;
bufferRemaining -= kvbuffer.length - bufvoid;
} else {
byte[] keytmp = new byte[bufindex];
System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
bufindex = 0;
out.write(kvbuffer, bufmark, headbytelen);
out.write(keytmp);
}
}
}
public class Buffer extends OutputStream {
private final byte[] scratch = new byte[1];
@Override
public void write(int v)
throws IOException {
scratch[0] = (byte)v;
write(scratch, 0, 1);
}
/**
* Attempt to write a sequence of bytes to the collection buffer.
* This method will block if the spill thread is running and it
* cannot write.
* @throws MapBufferTooSmallException if record is too large to
* deserialize into the collection buffer.
*/
@Override
public void write(byte b[], int off, int len)
throws IOException {
// must always verify the invariant that at least METASIZE bytes are
// available beyond kvindex, even when len == 0
bufferRemaining -= len;
if (bufferRemaining <= 0) {
// writing these bytes could exhaust available buffer space or fill
// the buffer to soft limit. check if spill or blocking are necessary
boolean blockwrite = false;
spillLock.lock();
try {
do {
checkSpillException();
final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
// ser distance to key index
final int distkvi = distanceTo(bufindex, kvbidx);
// ser distance to spill end index
final int distkve = distanceTo(bufindex, kvbend);
// if kvindex is closer than kvend, then a spill is neither in
// progress nor complete and reset since the lock was held. The
// write should block only if there is insufficient space to
// complete the current write, write the metadata for this record,
// and write the metadata for the next record. If kvend is closer,
// then the write should block if there is too little space for
// either the metadata or the current write. Note that collect
// ensures its metadata requirement with a zero-length write
blockwrite = distkvi <= distkve
? distkvi <= len + 2 * METASIZE
: distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
if (!spillInProgress) {
if (blockwrite) {
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
// need to use meta exclusively; zero-len rec & 100% spill
// pcnt would fail
resetSpill(); // resetSpill doesn't move bufindex, kvindex
bufferRemaining = Math.min(
distkvi - 2 * METASIZE,
softLimit - distanceTo(kvbidx, bufindex)) - len;
continue;
}
// we have records we can spill; only spill if blocked
if (kvindex != kvend) {
startSpill();
// Blocked on this write, waiting for the spill just
// initiated to finish. Instead of repositioning the marker
// and copying the partial record, we set the record start
// to be the new equator
setEquator(bufmark);
} else {
// We have no buffered records, and this record is too large
// to write into kvbuffer. We must spill it directly from
// collect
final int size = distanceTo(bufstart, bufindex) + len;
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
bufvoid = kvbuffer.length;
throw new MapBufferTooSmallException(size + " bytes");
}
}
}
if (blockwrite) {
// wait for spill
try {
while (spillInProgress) {
spillDone.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOInterruptedException(
"Buffer interrupted while waiting for the writer", e);
}
}
} while (blockwrite);
} finally {
spillLock.unlock();
}
}
// here, we know that we have sufficient space to write
// int overflow possible with (bufindex + len)
long futureBufIndex = (long) bufindex + len;
if (futureBufIndex > bufvoid) {
final int gaplen = bufvoid - bufindex;
System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
len -= gaplen;
off += gaplen;
bufindex = 0;
}
System.arraycopy(b, off, kvbuffer, bufindex, len);
bufindex += len;
}
}
void interruptSpillThread() throws IOException {
assert !spillLock.isHeldByCurrentThread();
// shut down spill thread and wait for it to exit. Since the preceding
// ensures that it is finished with its work (and sortAndSpill did not
// throw), we elect to use an interrupt instead of setting a flag.
// Spilling simultaneously from this thread while the spill thread
// finishes its work might be both a useful way to extend this and also
// sufficient motivation for the latter approach.
try {
spillThread.interrupt();
spillThread.join();
} catch (InterruptedException e) {
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Spill thread interrupted");
//Reset status
Thread.currentThread().interrupt();
throw new IOInterruptedException("Spill failed", e);
}
}
@Override
public void flush() throws IOException {
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Starting flush of map output");
outputContext.notifyProgress();
if (Thread.currentThread().isInterrupted()) {
/**
* Possible that the thread got interrupted when flush was happening or when the flush was
* never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
* on all I/O. At that time, this is safe to cleanup
*/
if (cleanup) {
cleanup();
}
try {
interruptSpillThread();
} catch(IOException e) {
//safe to ignore
}
return;
}
spillLock.lock();
try {
while (spillInProgress) {
spillDone.await();
}
checkSpillException();
final int kvbend = 4 * kvend;
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished
resetSpill();
}
if (kvindex != kvend) {
kvend = (kvindex + NMETA) % kvmeta.capacity();
bufend = bufmark;
if (LOG.isInfoEnabled()) {
LOG.info(
outputContext.getInputOutputVertexNames() + ": " + "Sorting & Spilling map output. "
+ "bufstart = " + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid
+ "; " + "kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
+ ", kvend = " + kvend + "(" + (kvend * 4) + ")"
+ ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" +
maxRec);
}
long sameKeyCount = 0;
long totalKeysCount = 0;
synchronized (this) {
sameKeyCount = sameKey;
totalKeysCount = totalKeys;
}
outputContext.notifyProgress();
sortAndSpill(sameKeyCount, totalKeysCount);
}
} catch (InterruptedException e) {
//Reset status
Thread.currentThread().interrupt();
interruptSpillThread();
throw new IOException("Interrupted while waiting for the writer", e);
} finally {
spillLock.unlock();
}
interruptSpillThread();
// release sort buffer before the mergecl
//FIXME
//kvbuffer = null;
try {
mergeParts();
} catch (InterruptedException e) {
cleanup();
Thread.currentThread().interrupt();
}
if (isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
}
}
@Override
public List<Event> close() throws IOException {
kvbuffer = null;
kvmeta = null;
return super.close();
}
boolean isClosed() {
return kvbuffer == null && kvmeta == null;
}
protected class SpillThread extends Thread {
volatile long totalKeysCount;
volatile long sameKeyCount;
@Override
public void run() {
spillLock.lock();
try {
spillThreadRunning = true;
while (true) {
spillDone.signal();
while (!spillInProgress) {
spillReady.await();
}
try {
spillLock.unlock();
sortAndSpill(sameKeyCount, totalKeysCount);
} catch (Throwable t) {
LOG.warn(outputContext.getInputOutputVertexNames() + ": " + "Got an exception in sortAndSpill", t);
sortSpillException = t;
} finally {
spillLock.lock();
if (bufend < bufstart) {
bufvoid = kvbuffer.length;
}
kvstart = kvend;
bufstart = bufend;
spillInProgress = false;
}
}
} catch (InterruptedException e) {
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Spill thread interrupted");
Thread.currentThread().interrupt();
} finally {
spillLock.unlock();
spillThreadRunning = false;
}
}
public void setTotalKeysProcessed(long sameKeyCount, long totalKeysCount) {
this.sameKeyCount = sameKeyCount;
this.totalKeysCount = totalKeysCount;
}
}
private void checkSpillException() throws IOException {
final Throwable lspillException = sortSpillException;
if (lspillException != null) {
if (lspillException instanceof Error) {
final String logMsg = "Task " + outputContext.getUniqueIdentifier()
+ " failed : " + ExceptionUtils.getStackTrace(lspillException);
outputContext.fatalError(lspillException, logMsg);
}
if (lspillException instanceof InterruptedException) {
throw new IOInterruptedException("Spill failed", lspillException);
} else {
throw new IOException("Spill failed", lspillException);
}
}
}
private void startSpill() {
assert !spillInProgress;
kvend = (kvindex + NMETA) % kvmeta.capacity();
bufend = bufmark;
spillInProgress = true;
if (LOG.isInfoEnabled()) {
LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling map output."
+ "bufstart=" + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid
+"; kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
+", kvend = " + kvend + "(" + (kvend * 4) + ")"
+ ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec);
}
spillThread.setTotalKeysProcessed(sameKey, totalKeys);
spillReady.signal();
}
int getMetaStart() {
return kvend / NMETA;
}
int getMetaEnd() {
return 1 + // kvend is a valid record
(kvstart >= kvend
? kvstart
: kvmeta.capacity() + kvstart) / NMETA;
}
private boolean isRLENeeded(long sameKeyCount, long totalKeysCount) {
return (sameKeyCount > (0.1 * totalKeysCount)) || (sameKeyCount < 0);
}
protected void sortAndSpill(long sameKeyCount, long totalKeysCount)
throws IOException, InterruptedException {
final int mstart = getMetaStart();
final int mend = getMetaEnd();
sorter.sort(this, mstart, mend, progressable);
spill(mstart, mend, sameKeyCount, totalKeysCount);
}
private void adjustSpillCounters(long rawLen, long compLength) {
if (!isFinalMergeEnabled()) {
outputBytesWithOverheadCounter.increment(rawLen);
} else {
if (numSpills > 0) {
additionalSpillBytesWritten.increment(compLength);
// Reset the value will be set during the final merge.
outputBytesWithOverheadCounter.setValue(0);
} else {
// Set this up for the first write only. Subsequent ones will be handled in the final merge.
outputBytesWithOverheadCounter.increment(rawLen);
}
}
}
protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCount)
throws IOException, InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
final long size = (bufend >= bufstart
? bufend - bufstart
: (bufvoid - bufend) + bufstart) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename);
ensureSpillFilePermissions(filename, rfs);
int spindex = mstart;
final InMemValBytes value = createInMemValBytes();
boolean rle = isRLENeeded(sameKeyCount, totalKeysCount);
for (int i = 0; i < partitions; ++i) {
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
if (spindex < mend && kvmeta.get(offsetFor(spindex) + PARTITION) == i
|| !sendEmptyPartitionDetails) {
writer = new Writer(serializationContext.getKeySerialization(),
serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
serializationContext.getValueClass(), codec, spilledRecordsCounter, null, rle);
}
if (combiner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
final int kvoff = offsetFor(spindex);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
TezRawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getInputOutputVertexNames() + ": " + "Running combine processor");
}
runCombineProcessor(kvIter, writer);
}
}
long rawLength = 0;
long partLength = 0;
// close the writer
if (writer != null) {
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
}
adjustSpillCounters(rawLength, partLength);
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats() && writer != null) {
partitionStats[i] += partLength;
}
writer = null;
} finally {
if (null != writer) writer.close();
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFileIndexPaths.put(numSpills, indexFilename);
spillRec.writeToFile(indexFilename, conf, localFs);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Finished spill " + numSpills
+ " at " + filename.toString());
++numSpills;
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
//Increment only when there was atleast one previous spill
numAdditionalSpills.increment(1);
}
} finally {
if (out != null) out.close();
}
}
/**
* Handles the degenerate case where serialization fails to fit in
* the in-memory buffer, so we must spill the record from collect
* directly to a spill file. Consider this "losing".
*/
private void spillSingleRecord(final Object key, final Object value,
int partition) throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename);
ensureSpillFilePermissions(filename, rfs);
// we don't run the combiner for a single record
for (int i = 0; i < partitions; ++i) {
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
if (!sendEmptyPartitionDetails || (i == partition)) {
writer = new Writer(serializationContext.getKeySerialization(),
serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
serializationContext.getValueClass(), codec, spilledRecordsCounter, null, false);
}
if (i == partition) {
final long recordStart = out.getPos();
writer.append(key, value);
// Note that our map byte count will not be accurate with
// compression
mapOutputByteCounter.increment(out.getPos() - recordStart);
}
long rawLength =0;
long partLength =0;
if (writer != null) {
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
}
adjustSpillCounters(rawLength, partLength);
// record offsets
TezIndexRecord rec = new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
writer = null;
} catch (IOException e) {
if (null != writer) writer.close();
throw e;
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFileIndexPaths.put(numSpills, indexFilename);
spillRec.writeToFile(indexFilename, conf, localFs);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
//Increment only when there is atleast one previous spill
numAdditionalSpills.increment(1);
}
} finally {
if (out != null) out.close();
}
}
protected int getInMemVBytesLength(int kvoff) {
// get the keystart for the next serialized value to be the end
// of this value. If this is the last value in the buffer, use bufend
final int vallen = kvmeta.get(kvoff + VALLEN);
assert vallen >= 0;
return vallen;
}
/**
* Given an offset, populate vbytes with the associated set of
* deserialized value bytes. Should only be called during a spill.
*/
int getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
int vallen = getInMemVBytesLength(kvoff);
vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
return vallen;
}
/**
* Inner class wrapping valuebytes, used for appendRaw.
*/
static class InMemValBytes extends DataInputBuffer {
private byte[] buffer;
private int start;
private int length;
private final int bufvoid;
public InMemValBytes(int bufvoid) {
this.bufvoid = bufvoid;
}
public void reset(byte[] buffer, int start, int length) {
this.buffer = buffer;
this.start = start;
this.length = length;
if (start + length > bufvoid) {
this.buffer = new byte[this.length];
final int taillen = bufvoid - start;
System.arraycopy(buffer, start, this.buffer, 0, taillen);
System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
this.start = 0;
}
super.reset(this.buffer, this.start, this.length);
}
}
InMemValBytes createInMemValBytes() {
return new InMemValBytes(bufvoid);
}
protected class MRResultIterator implements TezRawKeyValueIterator {
private final DataInputBuffer keybuf = new DataInputBuffer();
private final InMemValBytes vbytes = createInMemValBytes();
private final int end;
private int current;
public MRResultIterator(int start, int end) {
this.end = end;
current = start - 1;
}
@Override
public boolean hasNext() throws IOException {
return (current + 1) < end;
}
public boolean next() throws IOException {
return ++current < end;
}
public DataInputBuffer getKey() throws IOException {
final int kvoff = offsetFor(current);
keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
return keybuf;
}
public DataInputBuffer getValue() throws IOException {
getVBytesForOffset(offsetFor(current), vbytes);
return vbytes;
}
public Progress getProgress() {
return null;
}
@Override
public boolean isSameKey() throws IOException {
return false;
}
public void close() { }
}
private void maybeSendEventForSpill(List<Event> events, boolean isLastEvent,
TezSpillRecord spillRecord, int index, boolean sendEvent) throws IOException {
if (isFinalMergeEnabled()) {
return;
}
Preconditions.checkArgument(spillRecord != null, "Spill record can not be null");
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
partitionStats, reportDetailedPartitionStats(), auxiliaryService, deflater);
LOG.info(outputContext.getInputOutputVertexNames() + ": " +
"Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
if (sendEvent) {
outputContext.sendEvents(events);
}
}
private void maybeAddEventsForSpills() throws IOException {
if (isFinalMergeEnabled()) {
return;
}
List<Event> events = Lists.newLinkedList();
for(int i=0; i<numSpills; i++) {
TezSpillRecord spillRecord = indexCacheList.get(i);
if (spillRecord == null) {
//File was already written and location is stored in spillFileIndexPaths
spillRecord = new TezSpillRecord(spillFileIndexPaths.get(i), localFs);
} else {
//Double check if this file has to be written
if (spillFileIndexPaths.get(i) == null) {
Path indexPath = mapOutputFile.getSpillIndexFileForWrite(i, partitions *
MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRecord.writeToFile(indexPath, conf, localFs);
}
}
maybeSendEventForSpill(events, (i == numSpills - 1), spillRecord, i, false);
fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(i)).getLen());
}
outputContext.sendEvents(events);
}
private void mergeParts() throws IOException, InterruptedException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
final Path[] filename = new Path[numSpills];
final String taskIdentifier = outputContext.getUniqueIdentifier();
for(int i = 0; i < numSpills; i++) {
filename[i] = spillFilePaths.get(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
TezSpillRecord spillRecord = null;
if (isFinalMergeEnabled()) {
finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename[0]);
finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
sameVolRename(filename[0], finalOutputFile);
if (indexCacheList.size() == 0) {
sameVolRename(spillFileIndexPaths.get(0), finalIndexFile);
spillRecord = new TezSpillRecord(finalIndexFile, localFs);
} else {
spillRecord = indexCacheList.get(0);
spillRecord.writeToFile(finalIndexFile, conf, localFs);
}
} else {
List<Event> events = Lists.newLinkedList();
//Since there is only one spill, spill record would be present in cache.
spillRecord = indexCacheList.get(0);
Path indexPath = mapOutputFile.getSpillIndexFileForWrite(numSpills-1, partitions *
MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRecord.writeToFile(indexPath, conf, localFs);
maybeSendEventForSpill(events, true, spillRecord, 0, true);
fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(0)).getLen());
//No need to populate finalIndexFile, finalOutputFile etc when finalMerge is disabled
}
if (spillRecord != null && reportPartitionStats()) {
for(int i=0; i < spillRecord.size(); i++) {
partitionStats[i] += spillRecord.getIndex(i).getPartLength();
}
}
numShuffleChunks.setValue(numSpills);
return;
}
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = spillFileIndexPaths.get(i);
indexCacheList.add(new TezSpillRecord(indexFileName, localFs));
}
//Check if it is needed to do final merge. Or else, exit early.
if (numSpills > 0 && !isFinalMergeEnabled()) {
maybeAddEventsForSpills();
//No need to do final merge.
return;
}
//make correction in the length to include the sequence file header
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
if (isFinalMergeEnabled()) {
finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
} else if (numSpills == 0) {
//e.g attempt_1424502260528_0119_1_07_000058_0_10012_0/file.out when final merge is
// disabled
finalOutputFile = mapOutputFile.getSpillFileForWrite(numSpills, finalOutFileSize);
finalIndexFile = mapOutputFile.getSpillIndexFileForWrite(numSpills, finalIndexFileSize);
}
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
ensureSpillFilePermissions(finalOutputFile, rfs);
if (numSpills == 0) {
// TODO Change event generation to say there is no data rather than generating a dummy file
//create dummy files
long rawLength = 0;
long partLength = 0;
TezSpillRecord sr = new TezSpillRecord(partitions);
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
if (!sendEmptyPartitionDetails) {
Writer writer =
new Writer(serializationContext.getKeySerialization(),
serializationContext.getValSerialization(), finalOut,
serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
null, null);
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
}
TezIndexRecord rec =
new TezIndexRecord(segmentStart, rawLength, partLength);
// Covers the case of multiple spills.
outputBytesWithOverheadCounter.increment(rawLength);
sr.putIndex(rec, i);
}
sr.writeToFile(finalIndexFile, conf, localFs);
} finally {
finalOut.close();
}
++numSpills;
if (!isFinalMergeEnabled()) {
List<Event> events = Lists.newLinkedList();
maybeSendEventForSpill(events, true, sr, 0, true);
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
}
numShuffleChunks.setValue(numSpills);
return;
}
else {
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) {
boolean shouldWrite = false;
//create the segments to be merged
List<Segment> segmentList =
new ArrayList<Segment>(numSpills);
for (int i = 0; i < numSpills; i++) {
outputContext.notifyProgress();
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
if (indexRecord.hasData() || !sendEmptyPartitionDetails) {
shouldWrite = true;
DiskSegment s =
new DiskSegment(rfs, filename[i], indexRecord.getStartOffset(),
indexRecord.getPartLength(), codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, true);
segmentList.add(s);
}
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getInputOutputVertexNames() + ": "
+ "TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
"Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
indexRecord.getRawLength() + ", " +
indexRecord.getPartLength() + ")");
}
}
int mergeFactor =
this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
// sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
serializationContext, codec,
segmentList, mergeFactor,
new Path(taskIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
progressable, sortSegments, true,
null, spilledRecordsCounter, additionalSpillBytesRead,
null); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
long segmentStart = finalOut.getPos();
long rawLength = 0;
long partLength = 0;
if (shouldWrite) {
Writer writer = new Writer(serializationContext.getKeySerialization(),
serializationContext.getValSerialization(), finalOut,
serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
spilledRecordsCounter, null);
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer,
progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
runCombineProcessor(kvIter, writer);
}
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
}
outputBytesWithOverheadCounter.increment(rawLength);
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, parts);
if (reportPartitionStats()) {
partitionStats[parts] += partLength;
}
}
numShuffleChunks.setValue(1); //final merge has happened
spillRec.writeToFile(finalIndexFile, conf, localFs);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
}
}
}