blob: 25442b3ff600c78f0789b09c68c59249a46f678d [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
/**
* @author pouria
* This class implements the run generator for sorting with replacement
* selection, where there is a limit on the output, i.e. we are looking
* for top-k tuples (first k smallest tuples w.r.t sorting keys).
* A SortMinMaxHeap is used as the selectionTree to decide the order of
* writing tuples into the runs, and also to prune tuples (if possible).
* Memory manager is based on a binary search tree and is used to
* allocate memory slots for tuples.
* The overall process is as follows (Assuming that the limit is K):
* - Read the input data frame by frame. For each tuple T in the current
* frame:
* - If currentRun R has reached the limit of K on the size, and (T >
* maximum tuple of R), then ignore T.
* - Otherwise, try to allocate a memory slot for writing T along with
* the attached header/footer (for memory management purpose)
* - If T can not be allocated, try to output as many tuples, currently
* resident in memory, as needed so that a free slot, large enough to
* hold T, gets created. MinMaxHeap decides about which tuple should be
* sent to the output at each step.
* - Write T into memory.
* - Calculate the runID of T (based on the last output tuple for the
* current run). It is either the current run or the next run. Also
* calculate Poorman's Normalized Key (PNK) for T, to make comparisons
* faster later.
* - Create an heap element for T, containing its runID, the slot ptr to
* its memory location, and its PNK.
* - If runID is the nextRun, insert the heap element into the heap, and
* increment the size of nextRun.
* - If runID is the currentRun, then:
* - If currentRun has not hit the limit of k, insert the element into
* the heap, and increase currentRun size. - Otherwise, currentRun has
* hit the limit of K, while T is less than the max. So discard the
* current max for the current run (by poping it from the heap and
* unallocating its memory location) and insert the heap element into
* the heap. No need to change the currentRun size as we are replacing
* an old element (the old max) with T.
* - Upon closing, write all the tuples, currently resident in memory,
* into their corresponding run(s).
* - Note that upon opening a new Run R, if size of R (based on stats)
* is S and (S > K), then (S-K) current maximum tuples of R (which are
* resident in memory) get discarded at the beginning. MinMax heap can
* be used to find these tuples.
*/
public class OptimizedExternalSortRunGeneratorWithLimit implements IRunGenerator {
private final IHyracksTaskContext ctx;
private final int[] sortFields;
private final INormalizedKeyComputer nkc;
private final IBinaryComparatorFactory[] comparatorFactories;
private final IBinaryComparator[] comparators;
private final RecordDescriptor recordDescriptor;
private final List<IFrameReader> runs;
private ISelectionTree sTree;
private IMemoryManager memMgr;
private final int memSize;
private FrameTupleAccessor inputAccessor; // Used to read tuples in
// nextFrame()
private FrameTupleAppender outputAppender; // Used to write tuple to the
// dedicated output buffer
private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
// into run(s)
private FrameTupleAccessor lastRecordAccessor; // Used to read last output
// record from the output
// buffer
private FrameTupleAccessor fta2; // Used to read max record
private final int outputLimit;
private int curRunSize;
private int nextRunSize;
private int lastTupleIx; // Holds index of last output tuple in the
// dedicated output buffer
private Slot allocationPtr; // Contains the ptr to the allocated memory slot
// by the memory manager for the new tuple
private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
// the selectionTree to output
private Slot discard;
private int[] sTreeTop;
private int[] peek;
private RunFileWriter writer;
private boolean newRun;
private int curRunId;
public OptimizedExternalSortRunGeneratorWithLimit(IHyracksTaskContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, int memSize, int limit) {
this.ctx = ctx;
this.sortFields = sortFields;
nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
this.comparatorFactories = comparatorFactories;
comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
this.recordDescriptor = recordDesc;
this.runs = new LinkedList<IFrameReader>();
this.memSize = memSize;
this.outputLimit = limit;
}
@Override
public void open() throws HyracksDataException {
runs.clear();
inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
outputAppender = new FrameTupleAppender(ctx.getFrameSize());
outputBuffer = ctx.allocateFrame();
outputAppender.reset(outputBuffer, true);
lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
this.memMgr = new BSTMemMgr(ctx, memSize);
this.sTree = new SortMinMaxHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
this.allocationPtr = new Slot();
this.outputedTuple = new Slot();
this.sTreeTop = new int[] { -1, -1, -1, -1 };
this.peek = new int[] { -1, -1, -1, -1 };
this.discard = new Slot();
curRunId = -1;
curRunSize = 0;
nextRunSize = 0;
openNewRun();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
inputAccessor.reset(buffer);
byte[] bufferArray = buffer.array();
int tupleCount = inputAccessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
if (curRunSize >= outputLimit) {
sTree.peekMax(peek);
if (isEntryValid(peek)
&& compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX],
peek[SortMinMaxHeap.OFFSET_IX]) >= 0) {
continue;
}
}
allocationPtr.clear();
int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
memMgr.allocate(tLength, allocationPtr);
while (allocationPtr.isNull()) {
int unAllocSize = -1;
while (unAllocSize < tLength) {
unAllocSize = outputRecord();
if (unAllocSize < 1) {
throw new HyracksDataException(
"Unable to allocate space for the new tuple, while there is no more tuple to output");
}
}
memMgr.allocate(tLength, allocationPtr);
}
int pnk = getPNK(inputAccessor, i, bufferArray);
int runId = getRunId(inputAccessor, i);
if (runId != curRunId) { // tuple belongs to the next run
memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
sTree.insert(entry);
nextRunSize++;
continue;
}
// belongs to the current run
if (curRunSize < outputLimit) {
memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
sTree.insert(entry);
curRunSize++;
continue;
}
sTree.peekMax(peek);
if (compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]) > 0) {
continue;
}
// replacing the max
sTree.getMax(peek);
discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
memMgr.unallocate(discard);
memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
sTree.insert(entry);
}
}
@Override
public void fail() throws HyracksDataException {
}
@Override
public void close() throws HyracksDataException {
while (!sTree.isEmpty()) { // Outputting remaining elements in the
// selectionTree
outputRecordForClose();
}
if (outputAppender.getTupleCount() > 0) { // Writing out very last
// resident records to file
FrameUtils.flushFrame(outputBuffer, writer);
}
writer.close();
runs.add(writer.createReader());
memMgr.close();
}
public List<IFrameReader> getRuns() {
return runs;
}
private int outputRecord() throws HyracksDataException {
outputedTuple.clear();
sTree.getMin(sTreeTop);
if (!isEntryValid(sTreeTop)) {
throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
}
int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] == curRunId) {
if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can
// not
// append
// to
// the
// tupleAppender
FrameUtils.flushFrame(outputBuffer, writer);
outputAppender.reset(outputBuffer, true);
if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
throw new HyracksDataException("Can not append to the ouput buffer in sort");
}
lastTupleIx = 0;
} else {
lastTupleIx++;
}
outputedTuple.set(tFrameIx, tOffset);
newRun = false;
return memMgr.unallocate(outputedTuple);
}
// Minimum belongs to the next Run
openNewRun();
int popCount = curRunSize - outputLimit;
int l = 0;
int maxFreedSpace = 0;
for (int p = 0; p < popCount; p++) {
sTree.getMax(peek);
if (!isEntryValid(peek)) {
throw new HyracksDataException("Invalid Maximum extracted from MinMaxHeap");
}
discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
l = memMgr.unallocate(discard);
if (l > maxFreedSpace) {
maxFreedSpace = l;
}
curRunSize--;
}
if (maxFreedSpace != 0) {
return maxFreedSpace;
}
// No max discarded (We just flushed out the prev run, so the output
// buffer should be clear)
if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
// append to
// the
// tupleAppender
throw new HyracksDataException("Can not append to the ouput buffer in sort");
}
lastTupleIx = 0;
outputedTuple.set(tFrameIx, tOffset);
newRun = false;
return memMgr.unallocate(outputedTuple);
}
private void outputRecordForClose() throws HyracksDataException {
sTree.getMin(sTreeTop);
if (!isEntryValid(sTreeTop)) {
throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
}
int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] != curRunId) {
openNewRun();
}
if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
// append to
// the
// tupleAppender
FrameUtils.flushFrame(outputBuffer, writer);
outputAppender.reset(outputBuffer, true);
if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
throw new HyracksDataException("Can not append to the ouput buffer in sort");
}
}
}
private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
// buffInArray
// out
// for
// better
// performance
// (not
// converting
// for
// each
// and
// every
// tuple)
int sfIdx = sortFields[0];
int tStart = fta.getTupleStartOffset(tIx);
int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
}
private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
// current
// record to
// last output
// record, it
// decides about
// current
// record's
// runId
if (newRun) { // Very first record for a new run
return curRunId;
}
byte[] lastRecBuff = outputBuffer.array();
lastRecordAccessor.reset(outputBuffer);
int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
ByteBuffer fr2 = fta.getBuffer();
byte[] curRecBuff = fr2.array();
int r2StartOffset = fta.getTupleStartOffset(tupIx);
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
int l2 = f2End - f2Start;
int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
if (c != 0) {
if (c <= 0) {
return curRunId;
} else {
return (curRunId + 1);
}
}
}
return curRunId;
}
// first<sec : -1
private int compareRecords(FrameTupleAccessor fta1, int ix1, int fix2, int offset2) {
ByteBuffer buff1 = fta1.getBuffer();
byte[] recBuff1 = buff1.array();
int offset1 = fta1.getTupleStartOffset(ix1);
offset2 += BSTNodeUtil.HEADER_SIZE;
ByteBuffer buff2 = memMgr.getFrame(fix2);
fta2.reset(buff2);
byte[] recBuff2 = buff2.array();
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
int f1Start = fIdx == 0 ? 0 : buff1.getInt(offset1 + (fIdx - 1) * 4);
int f1End = buff1.getInt(offset1 + fIdx * 4);
int s1 = offset1 + fta1.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
int f2Start = fIdx == 0 ? 0 : buff2.getInt(offset2 + (fIdx - 1) * 4);
int f2End = buff2.getInt(offset2 + fIdx * 4);
int s2 = offset2 + fta2.getFieldSlotsLength() + f2Start;
int l2 = f2End - f2Start;
int c = comparators[f].compare(recBuff1, s1, l1, recBuff2, s2, l2);
if (c != 0) {
return c;
}
}
return 0;
}
private void openNewRun() throws HyracksDataException {
if (writer != null) { // There is a prev run, so flush its tuples and
// close it first
if (outputAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outputBuffer, writer);
}
outputAppender.reset(outputBuffer, true);
writer.close();
runs.add(writer.createReader());
}
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
ExternalSortRunGenerator.class.getSimpleName());
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
curRunId++;
newRun = true;
curRunSize = nextRunSize;
nextRunSize = 0;
lastTupleIx = -1;
}
private boolean isEntryValid(int[] entry) {
return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
}
}