blob: dcaf7819ccda7dae5ba06c56e3f1ebc6666ed0e1 [file] [log] [blame]
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
import java.util.Arrays;
import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
/**
* @author pouria
* Implements a MinMax binary heap, used as the selection tree, in
* sorting with replacement. Check SortMinHeap for details on comparing
* elements.
*/
public class SortMinMaxHeap implements ISelectionTree {
static final int RUN_ID_IX = 0;
static final int FRAME_IX = 1;
static final int OFFSET_IX = 2;
private static final int PNK_IX = 3;
private static final int NOT_EXIST = -1;
private static final int ELEMENT_SIZE = 4;
private static final int INIT_ARRAY_SIZE = 512;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
private final RecordDescriptor recordDescriptor;
private final FrameTupleAccessor fta1;
private final FrameTupleAccessor fta2;
private int[] elements;
private int nextIx;
private final IMemoryManager memMgr;
public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, IMemoryManager memMgr) {
this.sortFields = sortFields;
this.comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
this.comparators[i] = comparatorFactories[i].createBinaryComparator();
}
this.recordDescriptor = recordDesc;
fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
this.memMgr = memMgr;
this.elements = new int[INIT_ARRAY_SIZE];
Arrays.fill(elements, -1);
this.nextIx = 0;
}
@Override
public void insert(int[] element) {
if (nextIx >= elements.length) {
elements = Arrays.copyOf(elements, elements.length * 2);
}
for (int i = 0; i < ELEMENT_SIZE; i++) {
elements[nextIx + i] = element[i];
}
nextIx += ELEMENT_SIZE;
bubbleUp(nextIx - ELEMENT_SIZE);
}
@Override
public void getMin(int[] result) {
if (nextIx == 0) {
result[0] = result[1] = result[2] = result[3] = -1;
return;
}
int[] topElem = delete(0);
for (int x = 0; x < ELEMENT_SIZE; x++) {
result[x] = topElem[x];
}
}
@Override
public void reset() {
Arrays.fill(elements, -1);
nextIx = 0;
}
@Override
public boolean isEmpty() {
return (nextIx < ELEMENT_SIZE);
}
@Override
public void peekMin(int[] result) {
if (nextIx == 0) {
result[0] = result[1] = result[2] = result[3] = -1;
return;
}
for (int x = 0; x < ELEMENT_SIZE; x++) {
result[x] = elements[x];
}
}
@Override
public void getMax(int[] result) {
if (nextIx == ELEMENT_SIZE) {
int[] topElement = removeLast();
for (int x = 0; x < ELEMENT_SIZE; x++) {
result[x] = topElement[x];
}
return;
}
if (nextIx > ELEMENT_SIZE) {
int lc = getLeftChild(0);
int rc = getRightChild(0);
int maxIx = lc;
if (rc != -1) {
maxIx = compare(lc, rc) < 0 ? rc : lc;
}
int[] maxElem = delete(maxIx);
for (int x = 0; x < ELEMENT_SIZE; x++) {
result[x] = maxElem[x];
}
return;
}
result[0] = result[1] = result[2] = result[3] = -1;
}
@Override
public void peekMax(int[] result) {
if (nextIx == ELEMENT_SIZE) {
for (int i = 0; i < ELEMENT_SIZE; i++) {
result[i] = elements[i];
}
return;
}
if (nextIx > ELEMENT_SIZE) {
int lc = getLeftChild(0);
int rc = getRightChild(0);
int maxIx = lc;
if (rc != -1) {
maxIx = compare(lc, rc) < 0 ? rc : lc;
}
for (int x = 0; x < ELEMENT_SIZE; x++) {
result[x] = elements[maxIx + x];
}
return;
}
result[0] = result[1] = result[2] = result[3] = -1;
}
private int[] delete(int delIx) {
int s = nextIx;
if (nextIx > ELEMENT_SIZE) {
int[] delEntry = Arrays.copyOfRange(elements, delIx, delIx + ELEMENT_SIZE);
int[] last = removeLast();
if (delIx != (s - ELEMENT_SIZE)) {
for (int x = 0; x < ELEMENT_SIZE; x++) {
elements[delIx + x] = last[x];
}
trickleDown(delIx);
}
return delEntry;
} else if (nextIx == ELEMENT_SIZE) {
return (removeLast());
}
return null;
}
private int[] removeLast() {
if (nextIx < ELEMENT_SIZE) { //this is the very last element
return new int[] { -1, -1, -1, -1 };
}
int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
nextIx -= ELEMENT_SIZE;
return l;
}
private void bubbleUp(int ix) {
int p = getParentIx(ix);
if (isAtMinLevel(ix)) {
if (p != NOT_EXIST && compare(p, ix) < 0) {
swap(ix, p);
bubbleUpMax(p);
} else {
bubbleUpMin(ix);
}
} else { // i is at max level
if (p != NOT_EXIST && compare(ix, p) < 0) {
swap(ix, p);
bubbleUpMin(p);
} else {
bubbleUpMax(ix);
}
}
}
private void bubbleUpMax(int ix) {
int gp = getGrandParent(ix);
if (gp != NOT_EXIST && compare(gp, ix) < 0) {
swap(ix, gp);
bubbleUpMax(gp);
}
}
private void bubbleUpMin(int ix) {
int gp = getGrandParent(ix);
if (gp != NOT_EXIST && compare(ix, gp) < 0) {
swap(ix, gp);
bubbleUpMin(gp);
}
}
private void trickleDown(int ix) {
if (isAtMinLevel(ix)) {
trickleDownMin(ix);
} else {
trickleDownMax(ix);
}
}
private void trickleDownMax(int ix) {
int maxIx = getMaxOfDescendents(ix);
if (maxIx == NOT_EXIST) {
return;
}
if (maxIx > getLeftChild(ix) && maxIx > getRightChild(ix)) { // A grand
// children
if (compare(ix, maxIx) < 0) {
swap(maxIx, ix);
int p = getParentIx(maxIx);
if (p != NOT_EXIST && compare(maxIx, p) < 0) {
swap(maxIx, p);
}
trickleDownMax(maxIx);
}
} else { // A children
if (compare(ix, maxIx) < 0) {
swap(ix, maxIx);
}
}
}
private void trickleDownMin(int ix) {
int minIx = getMinOfDescendents(ix);
if (minIx == NOT_EXIST) {
return;
}
if (minIx > getLeftChild(ix) && minIx > getRightChild(ix)) { // A grand
// children
if (compare(minIx, ix) < 0) {
swap(minIx, ix);
int p = getParentIx(minIx);
if (p != NOT_EXIST && compare(p, minIx) < 0) {
swap(minIx, p);
}
trickleDownMin(minIx);
}
} else { // A children
if (compare(minIx, ix) < 0) {
swap(ix, minIx);
}
}
}
// Min among children and grand children
private int getMinOfDescendents(int ix) {
int lc = getLeftChild(ix);
if (lc == NOT_EXIST) {
return NOT_EXIST;
}
int rc = getRightChild(ix);
if (rc == NOT_EXIST) {
return lc;
}
int min = (compare(lc, rc) < 0) ? lc : rc;
int[] lgc = getLeftGrandChildren(ix);
int[] rgc = getRightGrandChildren(ix);
for (int k = 0; k < 2; k++) {
if (lgc[k] != NOT_EXIST && compare(lgc[k], min) < 0) {
min = lgc[k];
}
if (rgc[k] != NOT_EXIST && compare(rgc[k], min) < 0) {
min = rgc[k];
}
}
return min;
}
// Max among children and grand children
private int getMaxOfDescendents(int ix) {
int lc = getLeftChild(ix);
if (lc == NOT_EXIST) {
return NOT_EXIST;
}
int rc = getRightChild(ix);
if (rc == NOT_EXIST) {
return lc;
}
int max = (compare(lc, rc) < 0) ? rc : lc;
int[] lgc = getLeftGrandChildren(ix);
int[] rgc = getRightGrandChildren(ix);
for (int k = 0; k < 2; k++) {
if (lgc[k] != NOT_EXIST && compare(max, lgc[k]) < 0) {
max = lgc[k];
}
if (rgc[k] != NOT_EXIST && compare(max, rgc[k]) < 0) {
max = rgc[k];
}
}
return max;
}
private void swap(int n1Ix, int n2Ix) {
int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
for (int i = 0; i < ELEMENT_SIZE; i++) {
elements[n1Ix + i] = elements[n2Ix + i];
elements[n2Ix + i] = temp[i];
}
}
private int getParentIx(int i) {
if (i < ELEMENT_SIZE) {
return NOT_EXIST;
}
return ((i - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
}
private int getGrandParent(int i) {
int p = getParentIx(i);
return p != -1 ? getParentIx(p) : NOT_EXIST;
}
private int getLeftChild(int i) {
int lc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + ELEMENT_SIZE;
return (lc < nextIx ? lc : -1);
}
private int[] getLeftGrandChildren(int i) {
int lc = getLeftChild(i);
return lc != NOT_EXIST ? new int[] { getLeftChild(lc), getRightChild(lc) } : new int[] { NOT_EXIST, NOT_EXIST };
}
private int getRightChild(int i) {
int rc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
return (rc < nextIx ? rc : -1);
}
private int[] getRightGrandChildren(int i) {
int rc = getRightChild(i);
return rc != NOT_EXIST ? new int[] { getLeftChild(rc), getRightChild(rc) } : new int[] { NOT_EXIST, NOT_EXIST };
}
private boolean isAtMinLevel(int i) {
int l = getLevel(i);
return l % 2 == 0 ? true : false;
}
private int getLevel(int i) {
if (i < ELEMENT_SIZE) {
return 0;
}
int cnv = i / ELEMENT_SIZE;
int l = (int) Math.floor(Math.log(cnv) / Math.log(2));
if (cnv == (((int) Math.pow(2, (l + 1))) - 1)) {
return (l + 1);
}
return l;
}
private ByteBuffer getFrame(int frameIx) {
return (memMgr.getFrame(frameIx));
}
// first < sec : -1
private int compare(int nodeSIx1, int nodeSIx2) {
int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE); //tree.get(nodeSIx1);
int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE); //tree.get(nodeSIx2);
return (compare(n1, n2));
}
// first < sec : -1
private int compare(int[] n1, int[] n2) {
// Compare Run Numbers
if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
}
// Compare Poor man Normalized Keys
if (n1[PNK_IX] != n2[PNK_IX]) {
return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
}
return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
}
private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset) {
byte[] b1 = fr1.array();
byte[] b2 = fr2.array();
fta1.reset(fr1);
fta2.reset(fr2);
int headerLen = BSTNodeUtil.HEADER_SIZE;
r1StartOffset += headerLen;
r2StartOffset += headerLen;
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
int l2 = f2End - f2Start;
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
if (c != 0) {
return c;
}
}
return 0;
}
}