blob: 8085f986cf7f05484791be596153b3594fff15f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.sort.impl;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.runtime.library.common.comparator.HashComparator;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@SuppressWarnings({"unchecked", "rawtypes"})
public class PipelinedSorter extends ExternalSorter {
private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
/**
* The size of each record in the index file for the map-outputs.
*/
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
private final static int APPROX_HEADER_LENGTH = 150;
private final int partitionBits;
private static final int PARTITION = 0; // partition offset in acct
private static final int KEYSTART = 1; // key offset in acct
private static final int VALSTART = 2; // val offset in acct
private static final int VALLEN = 3; // val len in acct
private static final int NMETA = 4; // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
// spill accounting
volatile Throwable sortSpillException = null;
int numSpills = 0;
private final int minSpillsForCombine;
private final HashComparator hasher;
// SortSpans
private SortSpan span;
private ByteBuffer largeBuffer;
// Merger
private final SpanMerger merger;
private final ExecutorService sortmaster;
private final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();
private int totalIndexCacheMemory;
private int indexCacheMemoryLimit;
// TODO Set additional countesr - total bytes written, spills etc.
public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
super(outputContext, conf, numOutputs, initialMemoryAvailable);
partitionBits = bitcount(partitions)+1;
//sanity checks
final float spillper =
this.conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT);
final int sortmb = this.availableMemoryMb;
indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT +
"\": " + spillper);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "\": " + sortmb);
}
// buffers and accounting
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
largeBuffer = ByteBuffer.allocate(maxMemUsage);
LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
// TODO: configurable setting?
span = new SortSpan(largeBuffer, 1024*1024, 16);
merger = new SpanMerger(comparator);
final int sortThreads =
this.conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS,
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS_DEFAULT);
sortmaster = Executors.newFixedThreadPool(sortThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Sorter [" + TezUtilsInternal
.cleanVertexName(outputContext.getDestinationVertexName()) + "] #%d")
.build());
// k/v serialization
if(comparator instanceof HashComparator) {
hasher = (HashComparator)comparator;
LOG.info("Using the HashComparator");
} else {
hasher = null;
}
valSerializer.open(span.out);
keySerializer.open(span.out);
minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
}
private int bitcount(int n) {
int bit = 0;
while(n!=0) {
bit++;
n >>= 1;
}
return bit;
}
public void sort() throws IOException {
SortSpan newSpan = span.next();
if(newSpan == null) {
// sort in the same thread, do not wait for the thread pool
merger.add(span.sort(sorter, comparator));
spill();
int items = 1024*1024;
int perItem = 16;
if(span.length() != 0) {
items = span.length();
perItem = span.kvbuffer.limit()/items;
items = (largeBuffer.capacity())/(METASIZE+perItem);
if(items > 1024*1024) {
// our goal is to have 1M splits and sort early
items = 1024*1024;
}
}
span = new SortSpan(largeBuffer, items, perItem);
} else {
// queue up the sort
SortTask task = new SortTask(span, sorter, comparator);
Future<SpanIterator> future = sortmaster.submit(task);
merger.add(future);
span = newSpan;
}
valSerializer.open(span.out);
keySerializer.open(span.out);
}
@Override
public void write(Object key, Object value)
throws IOException {
collect(
key, value, partitioner.getPartition(key, value, partitions));
}
/**
* Serialize the key, value to intermediate storage.
* When this method returns, kvindex must refer to sufficient unused
* storage to store one METADATA.
*/
synchronized void collect(Object key, Object value, final int partition
) throws IOException {
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
+ key.getClass().getName());
}
if (value.getClass() != valClass) {
throw new IOException("Type mismatch in value from map: expected "
+ valClass.getName() + ", received "
+ value.getClass().getName());
}
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
if(span.kvmeta.remaining() < METASIZE) {
this.sort();
}
int keystart = span.kvbuffer.position();
int valstart = -1;
int valend = -1;
try {
keySerializer.serialize(key);
valstart = span.kvbuffer.position();
valSerializer.serialize(value);
valend = span.kvbuffer.position();
} catch(BufferOverflowException overflow) {
// restore limit
span.kvbuffer.position(keystart);
this.sort();
// try again
this.collect(key, value, partition);
return;
}
int prefix = 0;
if(hasher != null) {
prefix = hasher.getHashCode(key);
}
prefix = (partition << (32 - partitionBits)) | (prefix >>> partitionBits);
/* maintain order as in PARTITION, KEYSTART, VALSTART, VALLEN */
span.kvmeta.put(prefix);
span.kvmeta.put(keystart);
span.kvmeta.put(valstart);
span.kvmeta.put(valend - valstart);
if((valstart - keystart) > span.keymax) {
span.keymax = (valstart - keystart);
}
if((valend - valstart) > span.valmax) {
span.valmax = (valend - valstart);
}
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(valend - keystart);
}
public void spill() throws IOException {
// create spill file
final long size = largeBuffer.capacity() +
(partitions * APPROX_HEADER_LENGTH);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
FSDataOutputStream out = rfs.create(filename, true, 4096);
try {
merger.ready(); // wait for all the future results from sort threads
LOG.info("Spilling to " + filename.toString());
for (int i = 0; i < partitions; ++i) {
TezRawKeyValueIterator kvIter = merger.filter(i);
//write merged output to disk
long segmentStart = out.getPos();
Writer writer =
new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter, null, merger.needsRLE());
if (combiner == null) {
while(kvIter.next()) {
writer.append(kvIter.getKey(), kvIter.getValue());
}
} else {
runCombineProcessor(kvIter, writer);
}
//close
writer.close();
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
segmentStart,
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, i);
}
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
// TODO: cache
spillRec.writeToFile(indexFilename, conf);
++numSpills;
} catch(InterruptedException ie) {
// TODO:the combiner has been interrupted
} finally {
out.close();
}
}
@Override
public void flush() throws IOException {
final String uniqueIdentifier = outputContext.getUniqueIdentifier();
Path finalOutputFile =
mapOutputFile.getOutputFileForWrite(0); //TODO
Path finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(0); //TODO
LOG.info("Starting flush of map output");
span.end();
merger.add(span.sort(sorter, comparator));
spill();
sortmaster.shutdown();
largeBuffer = null;
if(numSpills == 1) {
// someday be able to pass this directly to shuffle
// without writing to disk
final Path filename =
mapOutputFile.getSpillFile(0);
Path indexFilename =
mapOutputFile.getSpillIndexFile(0);
sameVolRename(filename, mapOutputFile.getOutputFileForWriteInVolume(filename));
sameVolRename(indexFilename, mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename));
return;
}
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
for(int i = 0; i < numSpills; i++) {
// TODO: build this cache before
Path indexFilename = mapOutputFile.getSpillIndexFile(i);
TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
indexCacheList.add(spillIndex);
}
for (int parts = 0; parts < partitions; parts++) {
//create the segments to be merged
List<Segment> segmentList =
new ArrayList<Segment>(numSpills);
for(int i = 0; i < numSpills; i++) {
Path spillFilename = mapOutputFile.getSpillFile(i);
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment s =
new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
indexRecord.getPartLength(), codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, true);
segmentList.add(i, s);
}
int mergeFactor =
this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
// sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
keyClass, valClass, codec,
segmentList, mergeFactor,
new Path(uniqueIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
nullProgressable, sortSegments, true,
null, spilledRecordsCounter, null,
null); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
long segmentStart = finalOut.getPos();
Writer writer =
new Writer(conf, finalOut, keyClass, valClass, codec,
spilledRecordsCounter, null, merger.needsRLE());
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
} else {
runCombineProcessor(kvIter, writer);
}
//close
writer.close();
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
segmentStart,
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, parts);
}
spillRec.writeToFile(finalIndexFile, conf);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
Path indexFilename = mapOutputFile.getSpillIndexFile(i);
Path spillFilename = mapOutputFile.getSpillFile(i);
rfs.delete(indexFilename,true);
rfs.delete(spillFilename,true);
}
}
public void close() { }
private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
int getPartition();
}
private class BufferStreamWrapper extends OutputStream
{
private final ByteBuffer out;
public BufferStreamWrapper(ByteBuffer out) {
this.out = out;
}
@Override
public void write(int b) throws IOException { out.put((byte)b); }
@Override
public void write(byte[] b) throws IOException { out.put(b); }
@Override
public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
}
protected class InputByteBuffer extends DataInputBuffer {
private byte[] buffer = new byte[256];
private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
private void resize(int length) {
if(length > buffer.length) {
buffer = new byte[length];
wrapped = ByteBuffer.wrap(buffer);
}
wrapped.limit(length);
}
public void reset(ByteBuffer b, int start, int length) {
resize(length);
b.position(start);
b.get(buffer, 0, length);
super.reset(buffer, 0, length);
}
// clone-ish function
public void reset(DataInputBuffer clone) {
byte[] data = clone.getData();
int start = clone.getPosition();
int length = clone.getLength();
resize(length);
System.arraycopy(data, start, buffer, 0, length);
super.reset(buffer, 0, length);
}
}
private class SortSpan implements IndexedSortable {
final IntBuffer kvmeta;
final ByteBuffer kvbuffer;
final DataOutputStream out;
private RawComparator comparator;
final int imeta[] = new int[NMETA];
final int jmeta[] = new int[NMETA];
int keymax = 1;
int valmax = 1;
private int i,j;
private byte[] ki;
private byte[] kj;
private int index = 0;
private InputByteBuffer hay = new InputByteBuffer();
private long eq = 0;
public SortSpan(ByteBuffer source, int maxItems, int perItem) {
int capacity = source.remaining();
int metasize = METASIZE*maxItems;
int dataSize = maxItems * perItem;
if(capacity < (metasize+dataSize)) {
// try to allocate less meta space, because we have sample data
metasize = METASIZE*(capacity/(perItem+METASIZE));
}
ByteBuffer reserved = source.duplicate();
reserved.mark();
LOG.info("reserved.remaining() = "+reserved.remaining());
LOG.info("reserved.size = "+metasize);
reserved.position(metasize);
kvbuffer = reserved.slice();
reserved.flip();
reserved.limit(metasize);
kvmeta = reserved
.slice()
.order(ByteOrder.nativeOrder())
.asIntBuffer();
out = new DataOutputStream(
new BufferStreamWrapper(kvbuffer));
}
public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
this.comparator = comparator;
ki = new byte[keymax];
kj = new byte[keymax];
LOG.info("begin sorting Span"+index + " ("+length()+")");
if(length() > 1) {
sorter.sort(this, 0, length(), nullProgressable);
}
LOG.info("done sorting Span"+index);
return new SpanIterator(this);
}
int offsetFor(int i) {
return (i * NMETA);
}
public void swap(final int mi, final int mj) {
final int kvi = offsetFor(mi);
final int kvj = offsetFor(mj);
kvmeta.position(kvi); kvmeta.get(imeta);
kvmeta.position(kvj); kvmeta.get(jmeta);
kvmeta.position(kvj); kvmeta.put(imeta);
kvmeta.position(kvi); kvmeta.put(jmeta);
if(i == mi || j == mj) i = -1;
if(i == mi || j == mj) j = -1;
}
public int compare(final int mi, final int mj) {
final int kvi = offsetFor(mi);
final int kvj = offsetFor(mj);
final int kvip = kvmeta.get(kvi + PARTITION);
final int kvjp = kvmeta.get(kvj + PARTITION);
// sort by partition
if (kvip != kvjp) {
return kvip - kvjp;
}
final int istart = kvmeta.get(kvi + KEYSTART);
final int jstart = kvmeta.get(kvj + KEYSTART);
final int ilen = kvmeta.get(kvi + VALSTART) - istart;
final int jlen = kvmeta.get(kvj + VALSTART) - jstart;
kvbuffer.position(istart);
kvbuffer.get(ki, 0, ilen);
kvbuffer.position(jstart);
kvbuffer.get(kj, 0, jlen);
// sort by key
final int cmp = comparator.compare(ki, 0, ilen, kj, 0, jlen);
if(cmp == 0) eq++;
return cmp;
}
public SortSpan next() {
ByteBuffer remaining = end();
if(remaining != null) {
int items = length();
int perItem = kvbuffer.position()/items;
SortSpan newSpan = new SortSpan(remaining, items, perItem);
newSpan.index = index+1;
return newSpan;
}
return null;
}
public int length() {
return kvmeta.limit()/NMETA;
}
public ByteBuffer end() {
ByteBuffer remaining = kvbuffer.duplicate();
remaining.position(kvbuffer.position());
remaining = remaining.slice();
kvbuffer.limit(kvbuffer.position());
kvmeta.limit(kvmeta.position());
int items = length();
if(items == 0) {
return null;
}
int perItem = kvbuffer.position()/items;
LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
if(remaining.remaining() < METASIZE+perItem) {
return null;
}
return remaining;
}
private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
int cmp = 0;
int keystart;
int valstart;
int partition;
partition = kvmeta.get(span.offsetFor(index) + PARTITION);
if(partition != needlePart) {
cmp = (partition-needlePart);
} else {
keystart = kvmeta.get(span.offsetFor(index) + KEYSTART);
valstart = kvmeta.get(span.offsetFor(index) + VALSTART);
// hay is allocated ahead of time
hay.reset(kvbuffer, keystart, valstart - keystart);
cmp = comparator.compare(hay.getData(),
hay.getPosition(), hay.getLength(),
needle.getData(),
needle.getPosition(), needle.getLength());
}
return cmp;
}
public long getEq() {
return eq;
}
@Override
public String toString() {
return String.format("Span[%d,%d]", NMETA*kvmeta.capacity(), kvbuffer.limit());
}
}
private class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
private int kvindex = -1;
private int maxindex;
private IntBuffer kvmeta;
private ByteBuffer kvbuffer;
private SortSpan span;
private InputByteBuffer key = new InputByteBuffer();
private InputByteBuffer value = new InputByteBuffer();
private Progress progress = new Progress();
private final int minrun = (1 << 4);
public SpanIterator(SortSpan span) {
this.kvmeta = span.kvmeta;
this.kvbuffer = span.kvbuffer;
this.span = span;
this.maxindex = (kvmeta.limit()/NMETA) - 1;
}
public DataInputBuffer getKey() throws IOException {
final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
return key;
}
public DataInputBuffer getValue() throws IOException {
final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
value.reset(kvbuffer, valstart, vallen);
return value;
}
public boolean next() throws IOException {
// caveat: since we use this as a comparable in the merger
if(kvindex == maxindex) return false;
if(kvindex % 100 == 0) {
progress.set((kvindex-maxindex) / maxindex);
}
kvindex += 1;
return true;
}
public void close() throws IOException {
}
public Progress getProgress() {
return progress;
}
public int getPartition() {
final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
return partition;
}
public int size() {
return (maxindex - kvindex);
}
public int compareTo(SpanIterator other) {
try {
return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
} catch(IOException ie) {
// since we're not reading off disk, how could getKey() throw exceptions?
}
return -1;
}
@Override
public String toString() {
return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
}
/**
* bisect returns the next insertion point for a given raw key, skipping keys
* which are <= needle using a binary search instead of a linear comparison.
* This is massively efficient when long strings of identical keys occur.
* @param needle
* @param needlePart
* @return
*/
int bisect(DataInputBuffer needle, int needlePart) {
int start = kvindex;
int end = maxindex-1;
int mid = start;
int cmp = 0;
if(end - start < minrun) {
return 0;
}
if(span.compareInternal(needle, needlePart, start) > 0) {
return kvindex;
}
// bail out early if we haven't got a min run
if(span.compareInternal(needle, needlePart, start+minrun) > 0) {
return 0;
}
if(span.compareInternal(needle, needlePart, end) < 0) {
return end - kvindex;
}
boolean found = false;
// we sort 100k items, the max it can do is 20 loops, but break early
for(int i = 0; start < end && i < 16; i++) {
mid = start + (end - start)/2;
cmp = span.compareInternal(needle, needlePart, mid);
if(cmp == 0) {
start = mid;
found = true;
} else if(cmp < 0) {
start = mid;
found = true;
}
if(cmp > 0) {
end = mid;
}
}
if(found) {
return start - kvindex;
}
return 0;
}
}
private class SortTask implements Callable<SpanIterator> {
private final SortSpan sortable;
private final IndexedSorter sorter;
private final RawComparator comparator;
public SortTask(SortSpan sortable,
IndexedSorter sorter, RawComparator comparator) {
this.sortable = sortable;
this.sorter = sorter;
this.comparator = comparator;
}
public SpanIterator call() {
return sortable.sort(sorter, comparator);
}
}
private class PartitionFilter implements TezRawKeyValueIterator {
private final PartitionedRawKeyValueIterator iter;
private int partition;
private boolean dirty = false;
public PartitionFilter(PartitionedRawKeyValueIterator iter) {
this.iter = iter;
}
public DataInputBuffer getKey() throws IOException { return iter.getKey(); }
public DataInputBuffer getValue() throws IOException { return iter.getValue(); }
public void close() throws IOException { }
public Progress getProgress() {
return new Progress();
}
public boolean next() throws IOException {
if(dirty || iter.next()) {
int prefix = iter.getPartition();
if((prefix >>> (32 - partitionBits)) == partition) {
dirty = false; // we found what we were looking for, good
return true;
} else if(!dirty) {
dirty = true; // we did a lookahead and failed to find partition
}
}
return false;
}
public void reset(int partition) {
this.partition = partition;
}
public int getPartition() {
return this.partition;
}
}
private class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
public SpanHeap() {
super(256);
}
/**
* {@link PriorityQueue}.poll() by a different name
* @return
*/
public SpanIterator pop() {
return this.poll();
}
}
private class SpanMerger implements PartitionedRawKeyValueIterator {
private final RawComparator comparator;
InputByteBuffer key = new InputByteBuffer();
InputByteBuffer value = new InputByteBuffer();
int partition;
private ArrayList< Future<SpanIterator>> futures = new ArrayList< Future<SpanIterator>>();
private SpanHeap heap = new SpanHeap();
private PartitionFilter partIter;
private int gallop = 0;
private SpanIterator horse;
private long total = 0;
private long count = 0;
private long eq = 0;
public SpanMerger(RawComparator comparator) {
this.comparator = comparator;
partIter = new PartitionFilter(this);
}
public void add(SpanIterator iter) throws IOException{
if(iter.next()) {
heap.add(iter);
}
}
public void add(Future<SpanIterator> iter) throws IOException{
this.futures.add(iter);
}
public boolean ready() throws IOException, InterruptedException {
try {
SpanIterator iter = null;
while(this.futures.size() > 0) {
Future<SpanIterator> futureIter = this.futures.remove(0);
iter = futureIter.get();
this.add(iter);
}
StringBuilder sb = new StringBuilder();
for(SpanIterator sp: heap) {
sb.append(sp.toString());
sb.append(",");
total += sp.span.length();
eq += sp.span.getEq();
}
LOG.info("Heap = " + sb.toString());
return true;
} catch(Exception e) {
LOG.info(e.toString());
return false;
}
}
private SpanIterator pop() throws IOException {
if(gallop > 0) {
gallop--;
return horse;
}
SpanIterator current = heap.pop();
SpanIterator next = heap.peek();
if(next != null && current != null &&
((Object)horse) == ((Object)current)) {
// TODO: a better threshold check
gallop = current.bisect(next.getKey(), next.getPartition())-1;
}
horse = current;
return current;
}
public boolean needsRLE() {
return (eq > 0.1 * total);
}
private SpanIterator peek() throws IOException {
if(gallop > 0) {
return horse;
}
return heap.peek();
}
public boolean next() throws IOException {
SpanIterator current = pop();
if(current != null) {
// keep local copies, since add() will move it all out
key.reset(current.getKey());
value.reset(current.getValue());
partition = current.getPartition();
if(gallop <= 0) {
this.add(current);
} else {
// galloping
current.next();
}
return true;
}
return false;
}
public DataInputBuffer getKey() throws IOException { return key; }
public DataInputBuffer getValue() throws IOException { return value; }
public int getPartition() { return partition; }
public void close() throws IOException {
}
public Progress getProgress() {
// TODO
return new Progress();
}
public TezRawKeyValueIterator filter(int partition) {
partIter.reset(partition);
return partIter;
}
}
}