blob: c6c899d11d1f76d50c24190af72e76c8f8ebbfda [file] [log] [blame]
package edu.uci.ics.hyracks.dataflow.std.join;
import java.nio.ByteBuffer;
import java.util.BitSet;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionGeneratorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerGeneratorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
* @author pouria
* This class guides the joining process, and switches between different
joining techniques, w.r.t the implemented optimizations and skew in size of the
- Operator overview:
Assume we are trying to do (R Join S), with M buffers available, while we have an estimate on the size
of R (in terms of buffers). HHJ (Hybrid Hash Join) has two main phases: Build and Probe, where in our implementation Probe phase
can apply HHJ recursively, based on the value of M and size of R and S. HHJ phases proceed as follow:
– Calculate number of partitions (Based on the size of R, fudge factor and M) [See Shapiro's paper for the detailed discussion].
– Initialize the build phase (one frame per partition – all partitions considered resident at first)
– Read tuples of R, frame by frame, and hash each tuple (based on a given hash function) to find
its target partition and try to append it to that partition:
– If target partition's buffer is full, try to allocate a new buffer for it.
– if no free buffer is available, find the largest resident partition and spill it. Using its freed
buffers after spilling, allocate a new buffer for the target partition.
– Being done with R, close the build phase. (During closing we write the very last buffer of each
spilled partition to the disk, and we do partition tuning, where we try to bring back as many buffers, belonging to
spilled partitions as possible into memory, based on the free buffers - We will stop at the point where remaining free buffers is not enough
for reloading an entire partition back into memory)
– Create the hash table for the resident partitions (basically we create an in-memory hash join here)
– Initialize the probe phase on S (mainly allocate one buffer per spilled partition, and one buffer
for the whole resident partitions)
– Read tuples of S, frame by frame and hash each tuple T to its target partition P
– if P is a resident partition, pass T to the in-memory hash join and generate the output record,
if any matching(s) record found
– if P is spilled, write T to the dedicated buffer for P (on the probe side)
– Once scanning of S is done, we try to join partition pairs (Ri, Si) of the spilled partitions:
– if any of Ri or Si is smaller than M, then we simply use an in-memory hash join to join them
– otherwise we apply HHJ recursively:
– if after applying HHJ recursively, we do not gain enough size reduction (max size of the
resulting partitions were more than 80% of the initial Ri,Si size) then we switch to
nested loop join for joining.
– (At each step of partition-pair joining, we consider role reversal, which means if size of Si were
greater than Ri, then we make sure that we switch the roles of build/probe between them)
public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
private static final long serialVersionUID = 1L;
private static final double NLJ_SWITCH_THRESHOLD = 0.8;
private static final String PROBE_REL = "RelR";
private static final String BUILD_REL = "RelS";
private final int memsize;
private final int inputsize0;
private final double fudgeFactor;
private final int[] probeKeys;
private final int[] buildKeys;
private final IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories;
private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ
private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, double factor,
int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory0,
ITuplePairComparatorFactory tupPaircomparatorFactory1, boolean isLeftOuter,
INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
this.fudgeFactor = factor;
this.probeKeys = keys0;
this.buildKeys = keys1;
this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories;
this.comparatorFactories = comparatorFactories;
this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
recordDescriptors[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, double factor,
int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1)
throws HyracksDataException {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
this.fudgeFactor = factor;
this.probeKeys = keys0;
this.buildKeys = keys1;
this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories;
this.comparatorFactories = comparatorFactories;
this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
recordDescriptors[0] = recordDescriptor;
this.isLeftOuter = false;
this.nullWriterFactories1 = null;
public void contributeActivities(IActivityGraphBuilder builder) {
PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(new ActivityId(odId,
ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(new ActivityId(odId,
builder.addSourceEdge(0, phase1, 0);
builder.addSourceEdge(1, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
builder.addTargetEdge(0, phase2, 0);
//memorySize is the memory for join (we have already excluded the 2 buffers for in/out)
private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
throws HyracksDataException {
int numberOfPartitions = 0;
if (memorySize <= 1) {
throw new HyracksDataException("not enough memory is available for Hybrid Hash Join");
if (memorySize > buildSize) {
return 1; //We will switch to in-Mem HJ eventually
numberOfPartitions = (int) (Math.ceil((double) (buildSize * factor / nPartitions - memorySize) / (double) (memorySize - 1)));
if (numberOfPartitions <= 0) {
numberOfPartitions = 1; //becomes in-memory hash join
if (numberOfPartitions > memorySize) {
numberOfPartitions = (int) Math.ceil(Math.sqrt(buildSize * factor / nPartitions));
return (numberOfPartitions < memorySize ? numberOfPartitions : memorySize);
return numberOfPartitions;
public static class BuildAndPartitionTaskState extends AbstractTaskState {
private int memForJoin;
private int numOfPartitions;
private OptimizedHybridHashJoin hybridHJ;
public BuildAndPartitionTaskState() {
private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
super(jobId, taskId);
public void toBytes(DataOutput out) throws IOException {
public void fromBytes(DataInput in) throws IOException {
* Build phase of Hybrid Hash Join:
* Creating an instance of Hybrid Hash Join, using Shapiro's formula
* to get the optimal number of partitions, build relation is read and
* partitioned, and hybrid hash join instance gets ready for the probing.
* (See OptimizedHybridHashJoin for the details on different steps)
private class PartitionAndBuildActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
public PartitionAndBuildActivityNode(ActivityId id) {
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
.getJobId(), new TaskId(getActivityId(), partition));
ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys,
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys,
public void open() throws HyracksDataException {
if (memsize <= 2){ //Dedicated buffers: One buffer to read and one buffer for output
throw new HyracksDataException("not enough memory for Hybrid Hash Join");
state.memForJoin = memsize - 2;
state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {;
public void close() throws HyracksDataException {
public void fail() throws HyracksDataException {
return op;
* Probe phase of Hybrid Hash Join:
* Reading the probe side and partitioning it, resident tuples get
* joined with the build side residents (through formerly created HybridHashJoin in the build phase)
* and spilled partitions get written to run files. During the close() call, pairs of spilled partition
* (build side spilled partition and its corresponding probe side spilled partition) join, by applying
* Hybrid Hash Join recursively on them.
private class ProbeAndJoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
public ProbeAndJoinActivityNode(ActivityId id) {
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator();
final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator();
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private BuildAndPartitionTaskState state;
private ByteBuffer rPartbuff = ctx.allocateFrame();
private ITuplePartitionComputerGeneratorFactory hpcf0 = new FieldHashPartitionComputerGeneratorFactory(
probeKeys, hashFunctionGeneratorFactories);
private ITuplePartitionComputerGeneratorFactory hpcf1 = new FieldHashPartitionComputerGeneratorFactory(
buildKeys, hashFunctionGeneratorFactories);
private ITuplePartitionComputer hpcRep0;
private ITuplePartitionComputer hpcRep1;
public void open() throws HyracksDataException {
state = (BuildAndPartitionTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
state.hybridHJ.probe(buffer, writer);
public void fail() throws HyracksDataException {;
public void close() throws HyracksDataException {
BitSet partitionStatus = state.hybridHJ.getPartitinStatus();
hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0)
hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1)
for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
if (bReader == null || pReader == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
int beforeMax = (bSize > pSize) ? bSize : pSize;
joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1);
private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException {
ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys,
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys,
long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
//Apply in-Mem HJ if possible
if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
int tabSize = -1;
if (buildPartSize < probePartSize) {
tabSize = ohhj.getBuildPartitionSizeInTup(pid);
if(tabSize == 0){
throw new HyracksDataException("Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
//Build Side is smaller
applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0,
buildSideReader, probeSideReader);
} else { //Role Reversal
tabSize = ohhj.getProbePartitionSizeInTup(pid);
if(tabSize == 0){
throw new HyracksDataException("Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
//Probe Side is smaller
applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1,
probeSideReader, buildSideReader);
//Apply (Recursive) HHJ
else {
OptimizedHybridHashJoin rHHj;
if (buildPartSize < probePartSize) { //Build Side is smaller
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor, nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);;
while (buildSideReader.nextFrame(rPartbuff)) {;
while (probeSideReader.nextFrame(rPartbuff)) {
rHHj.probe(rPartbuff, writer);
int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
if (rbrfw == null || rprfw == null) {
joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1));
} else { //Switch to NLJ (Further recursion seems not to be useful)
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
if (rbrfw == null || rprfw == null) {
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
if (buildSideInTups < probeSideInTups) {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
} else {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
} else { //Role Reversal (Probe Side is smaller)
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor, nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);;
while (probeSideReader.nextFrame(rPartbuff)) {;
while (buildSideReader.nextFrame(rPartbuff)) {
rHHj.probe(rPartbuff, writer);
int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
if (rbrfw == null || rprfw == null) {
joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
} else { //Switch to NLJ (Further recursion seems not to be effective)
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
if (rbrfw == null || rprfw == null) {
long buildSideSize = rbrfw.getFileSize();
long probeSideSize = rprfw.getFileSize();
if (buildSideSize > probeSideSize) {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
} else {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader)
throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
isLeftOuter, nullWriters1, table);;
while (bReader.nextFrame(rPartbuff)) {
ByteBuffer copyBuffer = ctx.allocateFrame(); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
FrameUtils.copy(rPartbuff, copyBuffer);
// probe;
while (pReader.nextFrame(rPartbuff)) {
joiner.join(rPartbuff, writer);
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator)
throws HyracksDataException {
NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize);
ByteBuffer cacheBuff = ctx.allocateFrame();;
while (innerReader.nextFrame(cacheBuff)) {
ByteBuffer joinBuff = ctx.allocateFrame();;
while (outerReader.nextFrame(joinBuff)) {
nlj.join(joinBuff, writer);
return op;