blob: b51132eee17a564e32fed654e95a9ae53f3cd7ab [file] [log] [blame]
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
/**
* @author pouria This class defines the logic for merging the run, generated
* during the first phase of external sort (for both sorting without
* replacement selection and with it). For the case with replacement
* selection, this code also takes the limit on the output into account
* (if specified). If number of input runs is less than the available
* memory frames, then merging can be done in one pass, by allocating
* one buffer per run, and one buffer as the output buffer. A
* priorityQueue is used to find the top tuple at each iteration, among
* all the runs' heads in memory (check RunMergingFrameReader for more
* details). Otherwise, assuming that we have R runs and M memory
* buffers, where (R > M), we first merge first (M-1) runs and create a
* new sorted run, out of them. Discarding the first (M-1) runs, now
* merging procedure gets applied recursively on the (R-M+2) remaining
* runs using the M memory buffers. For the case of replacement
* selection, if outputLimit is specified, once the final pass is done
* on the runs (which is the pass that generates the final sorted
* output), as soon as the output size hits the output limit, the
* process stops, closes, and returns.
*/
public class ExternalSortRunMerger {
private final IHyracksTaskContext ctx;
private final List<IFrameReader> runs;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
private final RecordDescriptor recordDesc;
private final int framesLimit;
private final IFrameWriter writer;
private List<ByteBuffer> inFrames;
private ByteBuffer outFrame;
private FrameTupleAppender outFrameAppender;
private FrameSorter frameSorter; // Used in External sort, no replacement
// selection
private FrameTupleAccessor outFrameAccessor; // Used in External sort, with
// replacement selection
private final int outputLimit; // Used in External sort, with replacement
// selection and limit on output size
private int currentSize; // Used in External sort, with replacement
// selection and limit on output size
// Constructor for external sort, no replacement selection
public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<IFrameReader> runs,
int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
IFrameWriter writer) {
this.ctx = ctx;
this.frameSorter = frameSorter;
this.runs = new LinkedList<IFrameReader>(runs);
this.sortFields = sortFields;
this.comparators = comparators;
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
this.writer = writer;
this.outputLimit = -1;
}
// Constructor for external sort with replacement selection
public ExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs, int[] sortFields,
IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
this.ctx = ctx;
this.runs = new LinkedList<IFrameReader>(runs);
this.sortFields = sortFields;
this.comparators = comparators;
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
this.writer = writer;
this.outputLimit = outputLimit;
this.currentSize = 0;
this.frameSorter = null;
}
public void process() throws HyracksDataException {
writer.open();
try {
if (runs.size() <= 0) {
if (frameSorter != null && frameSorter.getFrameCount() > 0) {
frameSorter.flushFrames(writer);
}
/** recycle sort buffer */
frameSorter.close();
} else {
/** recycle sort buffer */
frameSorter.close();
inFrames = new ArrayList<ByteBuffer>();
outFrame = ctx.allocateFrame();
outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
outFrameAppender.reset(outFrame, true);
for (int i = 0; i < framesLimit - 1; ++i) {
inFrames.add(ctx.allocateFrame());
}
int maxMergeWidth = framesLimit - 1;
while (runs.size() > maxMergeWidth) {
int generationSeparator = 0;
while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth),
runs.size() - maxMergeWidth + 1);
FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
.getSimpleName());
IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
mergeResultWriter.open();
IFrameReader[] runCursors = new RunFileReader[mergeWidth];
for (int i = 0; i < mergeWidth; i++) {
runCursors[i] = runs.get(generationSeparator + i);
}
merge(mergeResultWriter, runCursors);
runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
}
}
if (!runs.isEmpty()) {
IFrameReader[] runCursors = new RunFileReader[runs.size()];
for (int i = 0; i < runCursors.length; i++) {
runCursors[i] = runs.get(i);
}
merge(writer, runCursors);
}
}
} catch (Exception e) {
writer.fail();
throw new HyracksDataException(e);
} finally {
writer.close();
}
}
private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields, comparators,
recordDesc);
merger.open();
try {
while (merger.nextFrame(outFrame)) {
FrameUtils.flushFrame(outFrame, mergeResultWriter);
}
} finally {
merger.close();
}
}
public void processWithReplacementSelection() throws HyracksDataException {
writer.open();
try {
outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
outFrame = ctx.allocateFrame();
outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
outFrameAppender.reset(outFrame, true);
if (runs.size() == 1) {
if (outputLimit < 1) {
runs.get(0).open();
ByteBuffer nextFrame = ctx.allocateFrame();
while (runs.get(0).nextFrame(nextFrame)) {
FrameUtils.flushFrame(nextFrame, writer);
outFrameAppender.reset(nextFrame, true);
}
return;
}
// Limit on the output size
int totalCount = 0;
runs.get(0).open();
FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
ByteBuffer nextFrame = ctx.allocateFrame();
while (totalCount <= outputLimit && runs.get(0).nextFrame(nextFrame)) {
fta.reset(nextFrame);
int tupCount = fta.getTupleCount();
if ((totalCount + tupCount) < outputLimit) {
FrameUtils.flushFrame(nextFrame, writer);
totalCount += tupCount;
continue;
}
// The very last buffer, which exceeds the limit
int copyCount = outputLimit - totalCount;
outFrameAppender.reset(outFrame, true);
for (int i = 0; i < copyCount; i++) {
if (!outFrameAppender.append(fta, i)) {
throw new IllegalStateException();
}
totalCount++;
}
}
if (outFrameAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
outFrameAppender.reset(outFrame, true);
}
return;
}
// More than one run, actual merging is needed
inFrames = new ArrayList<ByteBuffer>();
for (int i = 0; i < framesLimit - 1; ++i) {
inFrames.add(ctx.allocateFrame());
}
while (runs.size() > 0) {
try {
doPassWithReplacementSelection(runs);
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
} catch (Exception e) {
writer.fail();
throw new HyracksDataException(e);
} finally {
writer.close();
}
}
// creates a new run from runs that can fit in memory.
private void doPassWithReplacementSelection(List<IFrameReader> runs) throws HyracksDataException {
FileReference newRun = null;
IFrameWriter writer = this.writer;
boolean finalPass = false;
if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
finalPass = true;
for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
inFrames.remove(i);
}
} else {
newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
writer = new RunFileWriter(newRun, ctx.getIOManager());
writer.open();
}
try {
IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
for (int i = 0; i < inFrames.size(); i++) {
runCursors[i] = runs.get(i);
}
RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
comparators, recordDesc);
merger.open();
try {
while (merger.nextFrame(outFrame)) {
if (outputLimit > 0 && finalPass) {
outFrameAccessor.reset(outFrame);
int count = outFrameAccessor.getTupleCount();
if ((currentSize + count) > outputLimit) {
ByteBuffer b = ctx.allocateFrame();
FrameTupleAppender partialAppender = new FrameTupleAppender(ctx.getFrameSize());
partialAppender.reset(b, true);
int copyCount = outputLimit - currentSize;
for (int i = 0; i < copyCount; i++) {
partialAppender.append(outFrameAccessor, i);
currentSize++;
}
FrameUtils.makeReadable(b);
FrameUtils.flushFrame(b, writer);
break;
} else {
FrameUtils.flushFrame(outFrame, writer);
currentSize += count;
}
} else {
FrameUtils.flushFrame(outFrame, writer);
}
}
} finally {
merger.close();
}
if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
runs.clear();
return;
}
runs.subList(0, inFrames.size()).clear();
if (!finalPass) {
runs.add(0, ((RunFileWriter) writer).createReader());
}
} finally {
if (!finalPass) {
writer.close();
}
}
}
}