blob: c81bf54482e36f162d50ceae518bb7a16c49eecb [file] [log] [blame]
package edu.uci.ics.hyracks.dataflow.std.join;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
/**
* @author pouria
This class mainly applies one level of HHJ on a pair of
relations. It is always called by the descriptor.
*/
public class OptimizedHybridHashJoin {
private final int NO_MORE_FREE_BUFFER = -1;
private final int END_OF_PARTITION = -1;
private final int INVALID_BUFFER = -2;
private final int UNALLOCATED_FRAME = -3;
private final int BUFFER_FOR_RESIDENT_PARTS = -1;
private IHyracksTaskContext ctx;
private final String rel0Name;
private final String rel1Name;
private final int[] buildKeys;
private final int[] probeKeys;
private final IBinaryComparator[] comparators;
private ITuplePartitionComputer buildHpc;
private ITuplePartitionComputer probeHpc;
private final RecordDescriptor buildRd;
private final RecordDescriptor probeRd;
private RunFileWriter[] buildRFWriters; //writing spilled build partitions
private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
private final boolean isLeftOuter;
private final INullWriter[] nullWriters1;
private ByteBuffer[] memBuffs; //Memory buffers for build
private int[] curPBuff; //Current (last) Buffer for each partition
private int[] nextBuff; //Next buffer in the partition's buffer chain
private int[] buildPSizeInTups; //Size of build partitions (in tuples)
private int[] probePSizeInTups; //Size of probe partitions (in tuples)
private int nextFreeBuffIx; //Index of next available free buffer to allocate/use
private BitSet pStatus; //0=resident, 1=spilled
private int numOfPartitions;
private int memForJoin;
private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
private final FrameTupleAccessor accessorBuild;
private final FrameTupleAccessor accessorProbe;
private FrameTupleAppender buildTupAppender;
private FrameTupleAppender probeTupAppenderToResident;
private FrameTupleAppender probeTupAppenderToSpilled;
private int numOfSpilledParts;
private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
private ByteBuffer reloadBuffer; //Buffer for reloading spilled partitions during partition tuning
private int[] buildPSizeInFrames; //Used for partition tuning
private int freeFramesCounter; //Used for partition tuning
private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc) {
this.ctx = ctx;
this.memForJoin = memForJoin;
this.buildRd = buildRd;
this.probeRd = probeRd;
this.buildHpc = buildHpc;
this.probeHpc = probeHpc;
this.buildKeys = keys1;
this.probeKeys = keys0;
this.comparators = comparators;
this.rel0Name = rel0Name;
this.rel1Name = rel1Name;
this.numOfPartitions = numOfPartitions;
this.buildRFWriters = new RunFileWriter[numOfPartitions];
this.probeRFWriters = new RunFileWriter[numOfPartitions];
this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
this.isLeftOuter = false;
this.nullWriters1 = null;
}
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
this.ctx = ctx;
this.memForJoin = memForJoin;
this.buildRd = buildRd;
this.probeRd = probeRd;
this.buildHpc = buildHpc;
this.probeHpc = probeHpc;
this.buildKeys = keys1;
this.probeKeys = keys0;
this.comparators = comparators;
this.rel0Name = rel0Name;
this.rel1Name = rel1Name;
this.numOfPartitions = numOfPartitions;
this.buildRFWriters = new RunFileWriter[numOfPartitions];
this.probeRFWriters = new RunFileWriter[numOfPartitions];
this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
this.isLeftOuter = isLeftOuter;
this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
}
public void initBuild() {
memBuffs = new ByteBuffer[memForJoin];
curPBuff = new int[numOfPartitions];
nextBuff = new int[memForJoin];
pStatus = new BitSet(numOfPartitions);
buildPSizeInTups = new int[numOfPartitions];
buildPSizeInFrames = new int[numOfPartitions];
freeFramesCounter = memForJoin - numOfPartitions;
for (int i = 0; i < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
memBuffs[i] = ctx.allocateFrame();
curPBuff[i] = i;
nextBuff[i] = -1;
buildPSizeInFrames[i] = 1; //The dedicated initial buffer
}
nextFreeBuffIx = ((numOfPartitions < memForJoin) ? numOfPartitions : NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
for (int i = numOfPartitions; i < memBuffs.length; i++) {
nextBuff[i] = UNALLOCATED_FRAME;
}
buildTupAppender = new FrameTupleAppender(ctx.getFrameSize());
}
public void build(ByteBuffer buffer) throws HyracksDataException {
accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
boolean print = false;
if(print){
accessorBuild.prettyPrint();
}
for (int i = 0; i < tupleCount; ++i) {
int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
processTuple(i, pid);
buildPSizeInTups[pid]++;
}
}
private void processTuple(int tid, int pid) throws HyracksDataException {
ByteBuffer partition = memBuffs[curPBuff[pid]]; //Getting current buffer for the target partition
if (!pStatus.get(pid)) { //resident partition
buildTupAppender.reset(partition, false);
while (true) {
if (buildTupAppender.append(accessorBuild, tid)) { //Tuple added to resident partition successfully
break;
}
//partition does not have enough room
int newBuffIx = allocateFreeBuffer(pid);
if (newBuffIx == NO_MORE_FREE_BUFFER) { //Spill one partition
int pidToSpill = selectPartitionToSpill();
if (pidToSpill == -1) { //No more partition to spill
throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
}
spillPartition(pidToSpill);
buildTupAppender.reset(memBuffs[pidToSpill], true);
processTuple(tid, pid);
break;
} //New Buffer allocated successfully
partition = memBuffs[curPBuff[pid]]; //Current Buffer for the partition is now updated by allocateFreeBuffer() call above
buildTupAppender.reset(partition, true);
if (!buildTupAppender.append(accessorBuild, tid)) {
throw new HyracksDataException("Invalid State (Can not append to newly allocated buffer)");
}
buildPSizeInFrames[pid]++;
break;
}
} else { //spilled partition
boolean needClear = false;
while (true) {
buildTupAppender.reset(partition, needClear);
if (buildTupAppender.append(accessorBuild, tid)) {
break;
}
//Dedicated in-memory buffer for the partition is full, needed to be flushed first
buildWrite(pid, partition);
partition.clear();
needClear = true;
buildPSizeInFrames[pid]++;
}
}
}
private int allocateFreeBuffer(int pid) {
if (nextFreeBuffIx != NO_MORE_FREE_BUFFER) {
if (memBuffs[nextFreeBuffIx] == null) {
memBuffs[nextFreeBuffIx] = ctx.allocateFrame();
}
int curPartBuffIx = curPBuff[pid];
curPBuff[pid] = nextFreeBuffIx;
int oldNext = nextBuff[nextFreeBuffIx];
nextBuff[nextFreeBuffIx] = curPartBuffIx;
if (oldNext == UNALLOCATED_FRAME) {
nextFreeBuffIx++;
if (nextFreeBuffIx == memForJoin) { //No more free buffer
nextFreeBuffIx = NO_MORE_FREE_BUFFER;
}
} else {
nextFreeBuffIx = oldNext;
}
(memBuffs[curPBuff[pid]]).clear();
freeFramesCounter--;
return (curPBuff[pid]);
} else {
return NO_MORE_FREE_BUFFER; //A partitions needs to be spilled (if feasible)
}
}
private int selectPartitionToSpill() {
int maxSize = -1;
int partitionToSpill = -1;
for (int i = 0; i < buildPSizeInTups.length; i++) { //Find the largest partition, to spill
if (!pStatus.get(i) && (buildPSizeInTups[i] > maxSize)) {
maxSize = buildPSizeInTups[i];
partitionToSpill = i;
}
}
return partitionToSpill;
}
private void spillPartition(int pid) throws HyracksDataException {
int curBuffIx = curPBuff[pid];
ByteBuffer buff = null;
while (curBuffIx != END_OF_PARTITION) {
buff = memBuffs[curBuffIx];
buildWrite(pid, buff);
buff.clear();
int freedBuffIx = curBuffIx;
curBuffIx = nextBuff[curBuffIx];
if (freedBuffIx != pid) {
nextBuff[freedBuffIx] = nextFreeBuffIx;
nextFreeBuffIx = freedBuffIx;
freeFramesCounter++;
}
}
curPBuff[pid] = pid;
pStatus.set(pid);
}
private void buildWrite(int pid, ByteBuffer buff) throws HyracksDataException {
RunFileWriter writer = buildRFWriters[pid];
if (writer == null) {
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(rel0Name);
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
buildRFWriters[pid] = writer;
}
writer.nextFrame(buff);
}
public void closeBuild() throws HyracksDataException {
for (int i = 0; i < numOfPartitions; i++) { //Remove Empty Partitions' allocated frame
if (buildPSizeInTups[i] == 0) {
buildPSizeInFrames[i]--;
nextBuff[curPBuff[i]] = nextFreeBuffIx;
nextFreeBuffIx = curPBuff[i];
curPBuff[i] = INVALID_BUFFER;
freeFramesCounter++;
}
}
ByteBuffer buff = null;
for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
buff = memBuffs[i];
accessorBuild.reset(buff);
if (accessorBuild.getTupleCount() > 0) {
buildWrite(i, buff);
buildPSizeInFrames[i]++;
}
nextBuff[i] = nextFreeBuffIx;
nextFreeBuffIx = i;
freeFramesCounter++;
curPBuff[i] = INVALID_BUFFER;
if (buildRFWriters[i] != null) {
buildRFWriters[i].close();
}
}
partitionTune(); //Trying to bring back as many spilled partitions as possible, making them resident
int inMemTupCount = 0;
numOfSpilledParts = 0;
for (int i = 0; i < numOfPartitions; i++) {
if (!pStatus.get(i)) {
inMemTupCount += buildPSizeInTups[i];
} else {
numOfSpilledParts++;
}
}
createInMemoryJoiner(inMemTupCount);
cacheInMemJoin();
this.isTableEmpty = (inMemTupCount == 0);
}
private void partitionTune() throws HyracksDataException {
reloadBuffer = ctx.allocateFrame();
ArrayList<Integer> reloadSet = selectPartitionsToReload();
for (int i = 0; i < reloadSet.size(); i++) {
int pid = reloadSet.get(i);
int[] buffsToLoad = new int[buildPSizeInFrames[pid]];
for (int j = 0; j < buffsToLoad.length; j++) {
buffsToLoad[j] = nextFreeBuffIx;
int oldNext = nextBuff[nextFreeBuffIx];
if (oldNext == UNALLOCATED_FRAME) {
nextFreeBuffIx++;
if (nextFreeBuffIx == memForJoin) { //No more free buffer
nextFreeBuffIx = NO_MORE_FREE_BUFFER;
}
} else {
nextFreeBuffIx = oldNext;
}
}
curPBuff[pid] = buffsToLoad[0];
for (int k = 1; k < buffsToLoad.length; k++) {
nextBuff[buffsToLoad[k - 1]] = buffsToLoad[k];
}
loadPartitionInMem(pid, buildRFWriters[pid], buffsToLoad);
}
reloadSet.clear();
reloadSet = null;
}
private void loadPartitionInMem(int pid, RunFileWriter wr, int[] buffs) throws HyracksDataException {
RunFileReader r = wr.createReader();
r.open();
int counter = 0;
ByteBuffer mBuff = null;
reloadBuffer.clear();
while (r.nextFrame(reloadBuffer)) {
mBuff = memBuffs[buffs[counter]];
if (mBuff == null) {
mBuff = ctx.allocateFrame();
memBuffs[buffs[counter]] = mBuff;
}
FrameUtils.copy(reloadBuffer, mBuff);
counter++;
reloadBuffer.clear();
}
int curNext = nextBuff[buffs[buffs.length - 1]];
nextBuff[buffs[buffs.length - 1]] = END_OF_PARTITION;
nextFreeBuffIx = curNext;
r.close();
pStatus.set(pid, false);
buildRFWriters[pid] = null;
}
private ArrayList<Integer> selectPartitionsToReload() {
ArrayList<Integer> p = new ArrayList<Integer>();
for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) {
if (buildPSizeInFrames[i]>0 && (freeFramesCounter - buildPSizeInFrames[i] >= 0) ) {
p.add(i);
freeFramesCounter -= buildPSizeInFrames[i];
}
if (freeFramesCounter < 1) { //No more free buffer available
return p;
}
}
return p;
}
private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
comparators), isLeftOuter, nullWriters1, table);
}
private void cacheInMemJoin() throws HyracksDataException {
for (int pid = 0; pid < numOfPartitions; pid++) {
if (!pStatus.get(pid)) {
int nextBuffIx = curPBuff[pid];
while (nextBuffIx > -1) { //It is not Invalid or End_Of_Partition
inMemJoiner.build(memBuffs[nextBuffIx]);
nextBuffIx = nextBuff[nextBuffIx];
}
}
}
}
public void initProbe() {
sPartBuffs = new ByteBuffer[numOfSpilledParts];
for (int i = 0; i < numOfSpilledParts; i++) {
sPartBuffs[i] = ctx.allocateFrame();
}
curPBuff = new int[numOfPartitions];
int nextBuffIxToAlloc = 0;
/* We only need to allocate one frame per spilled partition.
* Resident partitions do not need frames in probe, as their tuples join
* immediately with the resident build tuples using the inMemoryHashJoin */
for (int i = 0; i < numOfPartitions; i++) {
curPBuff[i] = (pStatus.get(i)) ? nextBuffIxToAlloc++ : BUFFER_FOR_RESIDENT_PARTS;
}
probePSizeInTups = new int[numOfPartitions];
probeRFWriters = new RunFileWriter[numOfPartitions];
probeResBuff = ctx.allocateFrame();
probeTupAppenderToResident = new FrameTupleAppender(ctx.getFrameSize());
probeTupAppenderToResident.reset(probeResBuff, true);
probeTupAppenderToSpilled = new FrameTupleAppender(ctx.getFrameSize());
}
public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
accessorProbe.reset(buffer);
int tupleCount = accessorProbe.getTupleCount();
boolean print = false;
if(print){
accessorProbe.prettyPrint();
}
if (numOfSpilledParts == 0) {
inMemJoiner.join(buffer, writer);
return;
}
for (int i = 0; i < tupleCount; ++i) {
int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
if (buildPSizeInTups[pid] > 0) { //Tuple has potential match from previous phase
if (pStatus.get(pid)) { //pid is Spilled
boolean needToClear = false;
ByteBuffer buff = sPartBuffs[curPBuff[pid]];
while (true) {
probeTupAppenderToSpilled.reset(buff, needToClear);
if (probeTupAppenderToSpilled.append(accessorProbe, i)) {
break;
}
probeWrite(pid, buff);
buff.clear();
needToClear = true;
}
} else { //pid is Resident
while (true) {
if (probeTupAppenderToResident.append(accessorProbe, i)){
break;
}
inMemJoiner.join(probeResBuff, writer);
probeTupAppenderToResident.reset(probeResBuff, true);
}
}
probePSizeInTups[pid]++;
}
}
}
public void closeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
inMemJoiner.join(probeResBuff, writer);
inMemJoiner.closeJoin(writer);
for (int pid = pStatus.nextSetBit(0); pid >= 0; pid = pStatus.nextSetBit(pid + 1)) {
ByteBuffer buff = sPartBuffs[curPBuff[pid]];
accessorProbe.reset(buff);
if (accessorProbe.getTupleCount() > 0) {
probeWrite(pid, buff);
}
closeProbeWriter(pid);
}
}
private void probeWrite(int pid, ByteBuffer buff) throws HyracksDataException {
RunFileWriter pWriter = probeRFWriters[pid];
if (pWriter == null) {
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(rel1Name);
pWriter = new RunFileWriter(file, ctx.getIOManager());
pWriter.open();
probeRFWriters[pid] = pWriter;
}
pWriter.nextFrame(buff);
}
private void closeProbeWriter(int pid) throws HyracksDataException {
RunFileWriter writer = probeRFWriters[pid];
if (writer != null) {
writer.close();
}
}
public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader());
}
public long getBuildPartitionSize(int pid) {
return ((buildRFWriters[pid] == null) ? 0 : buildRFWriters[pid].getFileSize());
}
public int getBuildPartitionSizeInTup(int pid) {
return (buildPSizeInTups[pid]);
}
public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader());
}
public long getProbePartitionSize(int pid) {
return ((probeRFWriters[pid] == null) ? 0 : probeRFWriters[pid].getFileSize());
}
public int getProbePartitionSizeInTup(int pid) {
return (probePSizeInTups[pid]);
}
public int getMaxBuildPartitionSize() {
int max = buildPSizeInTups[0];
for (int i = 1; i < buildPSizeInTups.length; i++) {
if (buildPSizeInTups[i] > max) {
max = buildPSizeInTups[i];
}
}
return max;
}
public int getMaxProbePartitionSize() {
int max = probePSizeInTups[0];
for (int i = 1; i < probePSizeInTups.length; i++) {
if (probePSizeInTups[i] > max) {
max = probePSizeInTups[i];
}
}
return max;
}
public BitSet getPartitinStatus() {
return pStatus;
}
public String debugGetStats() {
int numOfResidentPartitions = 0;
int numOfSpilledPartitions = 0;
double sumOfBuildSpilledSizes = 0;
double sumOfProbeSpilledSizes = 0;
int numOfInMemTups = 0;
for (int i = 0; i < numOfPartitions; i++) {
if (pStatus.get(i)) { //Spilled
numOfSpilledPartitions++;
sumOfBuildSpilledSizes += buildPSizeInTups[i];
sumOfProbeSpilledSizes += probePSizeInTups[i];
} else { //Resident
numOfResidentPartitions++;
numOfInMemTups += buildPSizeInTups[i];
}
}
double avgBuildSpSz = sumOfBuildSpilledSizes / numOfSpilledPartitions;
double avgProbeSpSz = sumOfProbeSpilledSizes / numOfSpilledPartitions;
String s = "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t"
+ numOfSpilledPartitions + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t"
+ avgProbeSpSz + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t"
+ freeFramesCounter;
return s;
}
public boolean isTableEmpty(){
return this.isTableEmpty;
}
}