blob: a221118bd905d907196425fbbbde3cf86c944fb5 [file] [log] [blame]
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
import java.util.Arrays;
import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
/**
* @author pouria
* Implements a minimum binary heap, used as selection tree, for sort
* with replacement. This heap structure can only be used as the min
* heap (no access to the max element). Elements in the heap are
* compared based on their run numbers, and sorting key(s):
* Considering two heap elements A and B:
* if RunNumber(A) > RunNumber(B) then A is larger than B if
* RunNumber(A) == RunNumber(B), then A is smaller than B, if and only
* if the value of the sort key(s) in B is greater than A (based on the
* sort comparator).
*/
public class SortMinHeap implements ISelectionTree {
static final int RUN_ID_IX = 0;
static final int FRAME_IX = 1;
static final int OFFSET_IX = 2;
private static final int PNK_IX = 3;
private static final int ELEMENT_SIZE = 4;
private static final int INIT_ARRAY_SIZE = 512;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
private final RecordDescriptor recordDescriptor;
private final FrameTupleAccessor fta1;
private final FrameTupleAccessor fta2;
private int[] elements;
private int nextIx;
private final IMemoryManager memMgr;
private int[] top; // Used as a temp variable to access the top, to avoid object creation
public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, IMemoryManager memMgr) {
this.sortFields = sortFields;
this.comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
this.comparators[i] = comparatorFactories[i].createBinaryComparator();
}
this.recordDescriptor = recordDesc;
fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
this.memMgr = memMgr;
this.top = new int[ELEMENT_SIZE];
Arrays.fill(top, -1);
this.elements = new int[INIT_ARRAY_SIZE];
Arrays.fill(elements, -1);
this.nextIx = 0;
}
/*
* Assumption (element structure): [RunId][FrameIx][Offset][Poorman NK]
*/
@Override
public void getMin(int[] result) {
if (nextIx == 0) {
result[0] = result[1] = result[2] = result[3] = -1;
return;
}
top = delete(0);
for (int i = 0; i < top.length; i++) {
result[i] = top[i];
}
}
@Override
public void peekMin(int[] result) {
if (nextIx == 0) {
result[0] = result[1] = result[2] = result[3] = -1;
return;
}
for (int i = 0; i < ELEMENT_SIZE; i++) {
result[i] = elements[i];
}
}
@Override
public void insert(int[] e) {
if (nextIx >= elements.length) {
elements = Arrays.copyOf(elements, elements.length * 2);
}
for (int i = 0; i < ELEMENT_SIZE; i++) {
elements[nextIx + i] = e[i];
}
siftUp(nextIx);
nextIx += ELEMENT_SIZE;
}
@Override
public void reset() {
Arrays.fill(elements, -1);
nextIx = 0;
}
@Override
public boolean isEmpty() {
return (nextIx < ELEMENT_SIZE);
}
public int _debugGetSize() {
return (nextIx > 0 ? (nextIx - 1) / 4 : 0);
}
private int[] delete(int nix) {
int[] nv = Arrays.copyOfRange(elements, nix, nix + ELEMENT_SIZE);
int[] lastElem = removeLast();
if (nextIx == 0) {
return nv;
}
for (int i = 0; i < ELEMENT_SIZE; i++) {
elements[nix + i] = lastElem[i];
}
int pIx = getParent(nix);
if (pIx > -1 && (compare(lastElem, Arrays.copyOfRange(elements, pIx, pIx + ELEMENT_SIZE)) < 0)) {
siftUp(nix);
} else {
siftDown(nix);
}
return nv;
}
private int[] removeLast() {
if (nextIx < ELEMENT_SIZE) { //this is the very last element
return new int[] { -1, -1, -1, -1 };
}
int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
nextIx -= ELEMENT_SIZE;
return l;
}
private void siftUp(int nodeIx) {
int p = getParent(nodeIx);
if (p < 0) {
return;
}
while (p > -1 && (compare(nodeIx, p) < 0)) {
swap(p, nodeIx);
nodeIx = p;
p = getParent(nodeIx);
if (p < 0) { // We are at the root
return;
}
}
}
private void siftDown(int nodeIx) {
int mix = getMinOfChildren(nodeIx);
if (mix < 0) {
return;
}
while (mix > -1 && (compare(mix, nodeIx) < 0)) {
swap(mix, nodeIx);
nodeIx = mix;
mix = getMinOfChildren(nodeIx);
if (mix < 0) { // We hit the leaf level
return;
}
}
}
// first < sec : -1
private int compare(int nodeSIx1, int nodeSIx2) {
int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE);
int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE);
return (compare(n1, n2));
}
// first < sec : -1
private int compare(int[] n1, int[] n2) {
// Compare Run Numbers
if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
}
// Compare Poor man Normalized Keys
if (n1[PNK_IX] != n2[PNK_IX]) {
return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
}
return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
}
private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset) {
byte[] b1 = fr1.array();
byte[] b2 = fr2.array();
fta1.reset(fr1);
fta2.reset(fr2);
int headerLen = BSTNodeUtil.HEADER_SIZE;
r1StartOffset += headerLen;
r2StartOffset += headerLen;
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
int l2 = f2End - f2Start;
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
if (c != 0) {
return c;
}
}
return 0;
}
private int getMinOfChildren(int nix) { // returns index of min child
int lix = getLeftChild(nix);
if (lix < 0) {
return -1;
}
int rix = getRightChild(nix);
if (rix < 0) {
return lix;
}
return ((compare(lix, rix) < 0) ? lix : rix);
}
//Assumption: n1Ix and n2Ix are starting indices of two elements
private void swap(int n1Ix, int n2Ix) {
int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
for (int i = 0; i < ELEMENT_SIZE; i++) {
elements[n1Ix + i] = elements[n2Ix + i];
elements[n2Ix + i] = temp[i];
}
}
private int getLeftChild(int ix) {
int lix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + ELEMENT_SIZE;
return ((lix < nextIx) ? lix : -1);
}
private int getRightChild(int ix) {
int rix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
return ((rix < nextIx) ? rix : -1);
}
private int getParent(int ix) {
if (ix <= 0) {
return -1;
}
return ((ix - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
}
private ByteBuffer getFrame(int frameIx) {
return (memMgr.getFrame(frameIx));
}
@Override
public void getMax(int[] result) {
throw new IllegalStateException("getMax() method not applicable to Min Heap");
}
@Override
public void peekMax(int[] result) {
throw new IllegalStateException("getMax() method not applicable to Min Heap");
}
}