blob: f01d8869345b6b23caabda19cde15340ab63fc96 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
/**
* @author pouria This class implements the run generator for sorting with
* replacement selection, where there is no limit on the output, i.e.
* the whole data should be sorted. A SortMinHeap is used as the
* selectionTree to decide the order of writing tuples into the runs,
* while memory manager is based on a binary search tree to allocate
* tuples in the memory. The overall process is as follows: - Read the
* input data frame by frame. For each tuple T in the current frame: -
* Try to allocate a memory slot for writing T along with the attached
* header/footer (for memory management purpose) - If T can not be
* allocated, try to output as many tuples, currently resident in
* memory, as needed so that a free slot, large enough to hold T, gets
* created. MinHeap decides about which tuple should be sent to the
* output at each step. - Write T into the memory - Calculate the runID
* of T (based on the last output tuple for the current run). It is
* either the current run or the next run. Also calculate Poorman's
* Normalized Key (PNK) for T, to make comparisons faster later. -
* Create a heap element for T, containing: its runID, the slot pointer
* to its memory location, and its PNK. - Insert the created heap
* element into the heap - Upon closing, write all the tuples, currently
* resident in memory, into their corresponding run(s). Again min heap
* decides about which tuple is the next for output.
* OptimizedSortOperatorDescriptor will merge the generated runs, to
* generate the final sorted output of the data.
*/
public class OptimizedExternalSortRunGenerator implements IRunGenerator {
private final IHyracksTaskContext ctx;
private final int[] sortFields;
private final INormalizedKeyComputer nkc;
private final IBinaryComparatorFactory[] comparatorFactories;
private final IBinaryComparator[] comparators;
private final RecordDescriptor recordDescriptor;
private final List<IFrameReader> runs;
private ISelectionTree sTree;
private IMemoryManager memMgr;
private final int memSize;
private FrameTupleAccessor inputAccessor; // Used to read tuples in
// nextFrame()
private FrameTupleAppender outputAppender; // Used to write tuple to the
// dedicated output buffer
private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
// into run(s)
private FrameTupleAccessor lastRecordAccessor; // Used to read last output
// record from the output
// buffer
private int lastTupleIx; // Holds index of last output tuple in the
// dedicated output buffer
private Slot allocationPtr; // Contains the ptr to the allocated memory slot
// by the memory manager for the new tuple
private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
// the selectionTree to output
private int[] sTreeTop;
private RunFileWriter writer;
private boolean newRun;
private int curRunId;
public OptimizedExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, int memSize) {
this.ctx = ctx;
this.sortFields = sortFields;
nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
this.comparatorFactories = comparatorFactories;
comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
this.recordDescriptor = recordDesc;
this.runs = new LinkedList<IFrameReader>();
this.memSize = memSize;
}
@Override
public void open() throws HyracksDataException {
runs.clear();
inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
outputAppender = new FrameTupleAppender(ctx.getFrameSize());
outputBuffer = ctx.allocateFrame();
outputAppender.reset(outputBuffer, true);
lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
this.memMgr = new BSTMemMgr(ctx, memSize);
this.sTree = new SortMinHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
this.allocationPtr = new Slot();
this.outputedTuple = new Slot();
this.sTreeTop = new int[] { -1, -1, -1, -1 };
curRunId = -1;
openNewRun();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
inputAccessor.reset(buffer);
byte[] bufferArray = buffer.array();
int tupleCount = inputAccessor.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
allocationPtr.clear();
int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
memMgr.allocate(tLength, allocationPtr);
while (allocationPtr.isNull()) {
int unAllocSize = -1;
while (unAllocSize < tLength) {
unAllocSize = outputRecord();
if (unAllocSize < 1) {
throw new HyracksDataException(
"Unable to allocate space for the new tuple, while there is no more tuple to output");
}
}
memMgr.allocate(tLength, allocationPtr);
}
memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
int runId = getRunId(inputAccessor, i);
int pnk = getPNK(inputAccessor, i, bufferArray);
int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
sTree.insert(entry);
}
}
@Override
public void fail() throws HyracksDataException {
}
@Override
public void close() throws HyracksDataException {
while (!sTree.isEmpty()) { // Outputting remaining elements in the
// selectionTree
outputRecord();
}
if (outputAppender.getTupleCount() > 0) { // Writing out very last
// resident records to file
FrameUtils.flushFrame(outputBuffer, writer);
}
outputAppender.reset(outputBuffer, true);
writer.close();
runs.add(writer.createReader());
memMgr.close();
}
public List<IFrameReader> getRuns() {
return runs;
}
private int outputRecord() throws HyracksDataException {
outputedTuple.clear();
sTree.getMin(sTreeTop);
if (!isEntryValid(sTreeTop)) {
throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
}
if (sTreeTop[SortMinHeap.RUN_ID_IX] != curRunId) { // We need to switch
// runs
openNewRun();
}
int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
// append to
// the
// tupleAppender
FrameUtils.flushFrame(outputBuffer, writer);
outputAppender.reset(outputBuffer, true);
if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
throw new HyracksDataException("Can not append to the ouput buffer in sort");
}
lastTupleIx = 0;
} else {
lastTupleIx++;
}
outputedTuple.set(tFrameIx, tOffset);
newRun = false;
return memMgr.unallocate(outputedTuple);
}
private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
// buffInArray
// out
// for
// better
// performance
// (not
// converting
// for
// each
// and
// every
// tuple)
int sfIdx = sortFields[0];
int tStart = fta.getTupleStartOffset(tIx);
int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
}
private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
// current
// record to
// last output
// record, it
// decides about
// current
// record's
// runId
if (newRun) { // Very first record for a new run
return curRunId;
}
byte[] lastRecBuff = outputBuffer.array();
lastRecordAccessor.reset(outputBuffer);
int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
ByteBuffer fr2 = fta.getBuffer();
byte[] curRecBuff = fr2.array();
int r2StartOffset = fta.getTupleStartOffset(tupIx);
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
int l2 = f2End - f2Start;
int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
if (c != 0) {
if (c <= 0) {
return curRunId;
} else {
return (curRunId + 1);
}
}
}
return curRunId;
}
private void openNewRun() throws HyracksDataException {
if (writer != null) { // There is a prev run, so flush its tuples and
// close it first
if (outputAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outputBuffer, writer);
}
outputAppender.reset(outputBuffer, true);
writer.close();
runs.add(writer.createReader());
}
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
ExternalSortRunGenerator.class.getSimpleName());
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
curRunId++;
newRun = true;
lastTupleIx = -1;
}
private boolean isEntryValid(int[] entry) {
return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
}
}