| /* |
| * Copyright 2009-2010 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hyracks.dataflow.std.join; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.BitSet; |
| |
| import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; |
| import edu.uci.ics.hyracks.api.dataflow.ActivityId; |
| import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder; |
| import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable; |
| import edu.uci.ics.hyracks.api.dataflow.TaskId; |
| import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator; |
| import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; |
| import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; |
| import edu.uci.ics.hyracks.api.dataflow.value.INullWriter; |
| import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory; |
| import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider; |
| import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator; |
| import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory; |
| import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer; |
| import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily; |
| import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; |
| import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; |
| import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry; |
| import edu.uci.ics.hyracks.api.job.JobId; |
| import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor; |
| import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator; |
| import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils; |
| import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily; |
| import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerGeneratorFactory; |
| import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader; |
| import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode; |
| import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor; |
| import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject; |
| import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; |
| import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; |
| import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable; |
| import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable; |
| |
| /** |
| * @author pouria |
| * This class guides the joining process, and switches between different |
| * joining techniques, w.r.t the implemented optimizations and skew in size of the |
| * partitions. |
| * - Operator overview: |
| * Assume we are trying to do (R Join S), with M buffers available, while we have an estimate on the size |
| * of R (in terms of buffers). HHJ (Hybrid Hash Join) has two main phases: Build and Probe, where in our implementation Probe phase |
| * can apply HHJ recursively, based on the value of M and size of R and S. HHJ phases proceed as follow: |
| * BUILD: |
| * Calculate number of partitions (Based on the size of R, fudge factor and M) [See Shapiro's paper for the detailed discussion]. |
| * Initialize the build phase (one frame per partition, all partitions considered resident at first) |
| * Read tuples of R, frame by frame, and hash each tuple (based on a given hash function) to find |
| * its target partition and try to append it to that partition: |
| * If target partition's buffer is full, try to allocate a new buffer for it. |
| * if no free buffer is available, find the largest resident partition and spill it. Using its freed |
| * buffers after spilling, allocate a new buffer for the target partition. |
| * Being done with R, close the build phase. (During closing we write the very last buffer of each |
| * spilled partition to the disk, and we do partition tuning, where we try to bring back as many buffers, belonging to |
| * spilled partitions as possible into memory, based on the free buffers - We will stop at the point where remaining free buffers is not enough |
| * for reloading an entire partition back into memory) |
| * Create the hash table for the resident partitions (basically we create an in-memory hash join here) |
| * PROBE: |
| * Initialize the probe phase on S (mainly allocate one buffer per spilled partition, and one buffer |
| * for the whole resident partitions) |
| * Read tuples of S, frame by frame and hash each tuple T to its target partition P |
| * if P is a resident partition, pass T to the in-memory hash join and generate the output record, |
| * if any matching(s) record found |
| * if P is spilled, write T to the dedicated buffer for P (on the probe side) |
| * Once scanning of S is done, we try to join partition pairs (Ri, Si) of the spilled partitions: |
| * if any of Ri or Si is smaller than M, then we simply use an in-memory hash join to join them |
| * otherwise we apply HHJ recursively: |
| * if after applying HHJ recursively, we do not gain enough size reduction (max size of the |
| * resulting partitions were more than 80% of the initial Ri,Si size) then we switch to |
| * nested loop join for joining. |
| * (At each step of partition-pair joining, we consider role reversal, which means if size of Si were |
| * greater than Ri, then we make sure that we switch the roles of build/probe between them) |
| */ |
| |
| public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor { |
| private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0; |
| private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1; |
| |
| private static final long serialVersionUID = 1L; |
| private static final double NLJ_SWITCH_THRESHOLD = 0.8; |
| |
| private static final String PROBE_REL = "RelR"; |
| private static final String BUILD_REL = "RelS"; |
| |
| private final int memsize; |
| private final int inputsize0; |
| private final double fudgeFactor; |
| private final int[] probeKeys; |
| private final int[] buildKeys; |
| private final IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories; |
| private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ |
| private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe |
| private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe |
| |
| private final boolean isLeftOuter; |
| private final INullWriterFactory[] nullWriterFactories1; |
| |
| public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, |
| double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, |
| IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, |
| ITuplePairComparatorFactory tupPaircomparatorFactory0, |
| ITuplePairComparatorFactory tupPaircomparatorFactory1, boolean isLeftOuter, |
| INullWriterFactory[] nullWriterFactories1) throws HyracksDataException { |
| |
| super(spec, 2, 1); |
| this.memsize = memsize; |
| this.inputsize0 = inputsize0; |
| this.fudgeFactor = factor; |
| this.probeKeys = keys0; |
| this.buildKeys = keys1; |
| this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories; |
| this.comparatorFactories = comparatorFactories; |
| this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0; |
| this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1; |
| recordDescriptors[0] = recordDescriptor; |
| this.isLeftOuter = isLeftOuter; |
| this.nullWriterFactories1 = nullWriterFactories1; |
| |
| } |
| |
| public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, |
| double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, |
| IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, |
| ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1) |
| throws HyracksDataException { |
| |
| super(spec, 2, 1); |
| this.memsize = memsize; |
| this.inputsize0 = inputsize0; |
| this.fudgeFactor = factor; |
| this.probeKeys = keys0; |
| this.buildKeys = keys1; |
| this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories; |
| this.comparatorFactories = comparatorFactories; |
| this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0; |
| this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1; |
| recordDescriptors[0] = recordDescriptor; |
| this.isLeftOuter = false; |
| this.nullWriterFactories1 = null; |
| } |
| |
| @Override |
| public void contributeActivities(IActivityGraphBuilder builder) { |
| ActivityId buildAid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID); |
| ActivityId probeAid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID); |
| PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(buildAid, probeAid); |
| ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(probeAid, buildAid); |
| |
| builder.addActivity(this, phase1); |
| builder.addSourceEdge(1, phase1, 0); |
| |
| builder.addActivity(this, phase2); |
| builder.addSourceEdge(0, phase2, 0); |
| |
| builder.addBlockingEdge(phase1, phase2); |
| |
| builder.addTargetEdge(0, phase2, 0); |
| |
| } |
| |
| //memorySize is the memory for join (we have already excluded the 2 buffers for in/out) |
| private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions) |
| throws HyracksDataException { |
| int numberOfPartitions = 0; |
| if (memorySize <= 1) { |
| throw new HyracksDataException("not enough memory is available for Hybrid Hash Join"); |
| } |
| if (memorySize > buildSize) { |
| return 1; //We will switch to in-Mem HJ eventually |
| } |
| numberOfPartitions = (int) (Math.ceil((double) (buildSize * factor / nPartitions - memorySize) |
| / (double) (memorySize - 1))); |
| if (numberOfPartitions <= 0) { |
| numberOfPartitions = 1; //becomes in-memory hash join |
| } |
| if (numberOfPartitions > memorySize) { |
| numberOfPartitions = (int) Math.ceil(Math.sqrt(buildSize * factor / nPartitions)); |
| return (numberOfPartitions < memorySize ? numberOfPartitions : memorySize); |
| } |
| return numberOfPartitions; |
| } |
| |
| public static class BuildAndPartitionTaskState extends AbstractStateObject { |
| |
| private int memForJoin; |
| private int numOfPartitions; |
| private OptimizedHybridHashJoin hybridHJ; |
| |
| public BuildAndPartitionTaskState() { |
| } |
| |
| private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) { |
| super(jobId, taskId); |
| } |
| |
| @Override |
| public void toBytes(DataOutput out) throws IOException { |
| |
| } |
| |
| @Override |
| public void fromBytes(DataInput in) throws IOException { |
| |
| } |
| |
| } |
| |
| /* |
| * Build phase of Hybrid Hash Join: |
| * Creating an instance of Hybrid Hash Join, using Shapiro's formula |
| * to get the optimal number of partitions, build relation is read and |
| * partitioned, and hybrid hash join instance gets ready for the probing. |
| * (See OptimizedHybridHashJoin for the details on different steps) |
| */ |
| private class PartitionAndBuildActivityNode extends AbstractActivityNode { |
| private static final long serialVersionUID = 1L; |
| |
| private final ActivityId probeAid; |
| |
| public PartitionAndBuildActivityNode(ActivityId id, ActivityId probeAid) { |
| super(id); |
| this.probeAid = probeAid; |
| } |
| |
| @Override |
| public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, |
| IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) { |
| |
| final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); |
| final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0); |
| |
| final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; |
| for (int i = 0; i < comparatorFactories.length; i++) { |
| comparators[i] = comparatorFactories[i].createBinaryComparator(); |
| } |
| |
| IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() { |
| private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext() |
| .getJobId(), new TaskId(getActivityId(), partition)); |
| |
| ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys, |
| hashFunctionGeneratorFactories).createPartitioner(0); |
| ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys, |
| hashFunctionGeneratorFactories).createPartitioner(0); |
| |
| @Override |
| public void open() throws HyracksDataException { |
| if (memsize <= 2) { //Dedicated buffers: One buffer to read and one buffer for output |
| throw new HyracksDataException("not enough memory for Hybrid Hash Join"); |
| } |
| state.memForJoin = memsize - 2; |
| state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, |
| nPartitions); |
| if(!isLeftOuter){ |
| state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions, |
| PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, |
| buildHpc); |
| } |
| else{ |
| state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions, |
| PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, |
| buildHpc, isLeftOuter, nullWriterFactories1); |
| } |
| |
| state.hybridHJ.initBuild(); |
| } |
| |
| @Override |
| public void nextFrame(ByteBuffer buffer) throws HyracksDataException { |
| state.hybridHJ.build(buffer); |
| } |
| |
| @Override |
| public void close() throws HyracksDataException { |
| state.hybridHJ.closeBuild(); |
| ctx.setStateObject(state); |
| } |
| |
| @Override |
| public void fail() throws HyracksDataException { |
| } |
| |
| }; |
| return op; |
| } |
| } |
| |
| /* |
| * Probe phase of Hybrid Hash Join: |
| * Reading the probe side and partitioning it, resident tuples get |
| * joined with the build side residents (through formerly created HybridHashJoin in the build phase) |
| * and spilled partitions get written to run files. During the close() call, pairs of spilled partition |
| * (build side spilled partition and its corresponding probe side spilled partition) join, by applying |
| * Hybrid Hash Join recursively on them. |
| */ |
| private class ProbeAndJoinActivityNode extends AbstractActivityNode { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private final ActivityId buildAid; |
| |
| public ProbeAndJoinActivityNode(ActivityId id, ActivityId buildAid) { |
| super(id); |
| this.buildAid = buildAid; |
| } |
| |
| @Override |
| public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, |
| IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) { |
| |
| final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0); |
| final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); |
| final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; |
| final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx); |
| final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx); |
| |
| for (int i = 0; i < comparatorFactories.length; i++) { |
| comparators[i] = comparatorFactories[i].createBinaryComparator(); |
| } |
| |
| final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null; |
| if (isLeftOuter) { |
| for (int i = 0; i < nullWriterFactories1.length; i++) { |
| nullWriters1[i] = nullWriterFactories1[i].createNullWriter(); |
| } |
| } |
| |
| IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() { |
| private BuildAndPartitionTaskState state; |
| private ByteBuffer rPartbuff = ctx.allocateFrame(); |
| |
| private ITuplePartitionComputerFamily hpcf0 = new FieldHashPartitionComputerFamily(probeKeys, |
| hashFunctionGeneratorFactories); |
| private ITuplePartitionComputerFamily hpcf1 = new FieldHashPartitionComputerFamily(buildKeys, |
| hashFunctionGeneratorFactories); |
| |
| private ITuplePartitionComputer hpcRep0; |
| private ITuplePartitionComputer hpcRep1; |
| |
| @Override |
| public void open() throws HyracksDataException { |
| state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(), |
| BUILD_AND_PARTITION_ACTIVITY_ID), partition)); |
| |
| writer.open(); |
| state.hybridHJ.initProbe(); |
| |
| } |
| |
| @Override |
| public void nextFrame(ByteBuffer buffer) throws HyracksDataException { |
| if(!state.hybridHJ.isTableEmpty()){ |
| state.hybridHJ.probe(buffer, writer); |
| } |
| } |
| |
| @Override |
| public void fail() throws HyracksDataException { |
| writer.fail(); |
| } |
| |
| @Override |
| public void close() throws HyracksDataException { |
| |
| state.hybridHJ.closeProbe(writer); |
| |
| BitSet partitionStatus = state.hybridHJ.getPartitinStatus(); |
| hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0) |
| .createPartitioner(0); |
| hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1) |
| .createPartitioner(0); |
| |
| rPartbuff.clear(); |
| for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) { |
| |
| RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid); |
| RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid); |
| |
| if (bReader == null || pReader == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match) |
| continue; |
| } |
| int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid); |
| int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid); |
| int beforeMax = (bSize > pSize) ? bSize : pSize; |
| joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1); |
| |
| } |
| writer.close(); |
| } |
| |
| private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader, |
| RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException { |
| ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys, |
| hashFunctionGeneratorFactories).createPartitioner(level); |
| ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys, |
| hashFunctionGeneratorFactories).createPartitioner(level); |
| |
| long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize(); |
| long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize(); |
| |
| //Apply in-Mem HJ if possible |
| if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) { |
| int tabSize = -1; |
| if (buildPartSize < probePartSize) { |
| tabSize = ohhj.getBuildPartitionSizeInTup(pid); |
| if (tabSize == 0) { |
| throw new HyracksDataException( |
| "Trying to join an empty partition. Invalid table size for inMemoryHashJoin."); |
| } |
| //Build Side is smaller |
| applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0, |
| buildSideReader, probeSideReader); |
| |
| } else { //Role Reversal |
| tabSize = ohhj.getProbePartitionSizeInTup(pid); |
| if (tabSize == 0) { |
| throw new HyracksDataException( |
| "Trying to join an empty partition. Invalid table size for inMemoryHashJoin."); |
| } |
| //Probe Side is smaller |
| applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1, |
| probeSideReader, buildSideReader); |
| } |
| } |
| //Apply (Recursive) HHJ |
| else { |
| OptimizedHybridHashJoin rHHj; |
| if (buildPartSize < probePartSize) { //Build Side is smaller |
| |
| int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor, |
| nPartitions); |
| rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, |
| probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc); |
| |
| buildSideReader.open(); |
| rHHj.initBuild(); |
| rPartbuff.clear(); |
| while (buildSideReader.nextFrame(rPartbuff)) { |
| rHHj.build(rPartbuff); |
| } |
| |
| rHHj.closeBuild(); |
| |
| probeSideReader.open(); |
| rHHj.initProbe(); |
| rPartbuff.clear(); |
| while (probeSideReader.nextFrame(rPartbuff)) { |
| rHHj.probe(rPartbuff, writer); |
| } |
| rHHj.closeProbe(writer); |
| |
| int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize(); |
| int maxAfterProbeSize = rHHj.getMaxProbePartitionSize(); |
| int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize |
| : maxAfterProbeSize; |
| |
| BitSet rPStatus = rHHj.getPartitinStatus(); |
| if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) { |
| for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { |
| RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); |
| RunFileReader rprfw = rHHj.getProbeRFReader(rPid); |
| |
| if (rbrfw == null || rprfw == null) { |
| continue; |
| } |
| |
| joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1)); |
| } |
| |
| } else { //Switch to NLJ (Further recursion seems not to be useful) |
| for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { |
| RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); |
| RunFileReader rprfw = rHHj.getProbeRFReader(rPid); |
| |
| if (rbrfw == null || rprfw == null) { |
| continue; |
| } |
| |
| int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid); |
| int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid); |
| if (buildSideInTups < probeSideInTups) { |
| applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw, |
| nljComparator0); |
| } else { |
| applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw, |
| nljComparator1); |
| } |
| } |
| } |
| } else { //Role Reversal (Probe Side is smaller) |
| int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor, |
| nPartitions); |
| rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL, |
| buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc); |
| |
| probeSideReader.open(); |
| rHHj.initBuild(); |
| rPartbuff.clear(); |
| while (probeSideReader.nextFrame(rPartbuff)) { |
| rHHj.build(rPartbuff); |
| } |
| rHHj.closeBuild(); |
| rHHj.initProbe(); |
| buildSideReader.open(); |
| rPartbuff.clear(); |
| while (buildSideReader.nextFrame(rPartbuff)) { |
| rHHj.probe(rPartbuff, writer); |
| } |
| rHHj.closeProbe(writer); |
| int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize(); |
| int maxAfterProbeSize = rHHj.getMaxProbePartitionSize(); |
| int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize |
| : maxAfterProbeSize; |
| BitSet rPStatus = rHHj.getPartitinStatus(); |
| |
| if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) { |
| for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { |
| RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); |
| RunFileReader rprfw = rHHj.getProbeRFReader(rPid); |
| |
| if (rbrfw == null || rprfw == null) { |
| continue; |
| } |
| |
| joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1)); |
| } |
| } else { //Switch to NLJ (Further recursion seems not to be effective) |
| for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { |
| RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); |
| RunFileReader rprfw = rHHj.getProbeRFReader(rPid); |
| |
| if (rbrfw == null || rprfw == null) { |
| continue; |
| } |
| |
| long buildSideSize = rbrfw.getFileSize(); |
| long probeSideSize = rprfw.getFileSize(); |
| if (buildSideSize > probeSideSize) { |
| applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw, |
| nljComparator1); |
| } else { |
| applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw, |
| nljComparator0); |
| } |
| } |
| } |
| } |
| buildSideReader.close(); |
| probeSideReader.close(); |
| } |
| } |
| |
| private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc, |
| RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger, |
| ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader) |
| throws HyracksDataException { |
| |
| ISerializableTable table = new SerializableHashTable(tabSize, ctx); |
| InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor( |
| ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(), |
| buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators), |
| isLeftOuter, nullWriters1, table); |
| |
| bReader.open(); |
| rPartbuff.clear(); |
| while (bReader.nextFrame(rPartbuff)) { |
| ByteBuffer copyBuffer = ctx.allocateFrame(); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin |
| FrameUtils.copy(rPartbuff, copyBuffer); |
| FrameUtils.makeReadable(copyBuffer); |
| joiner.build(copyBuffer); |
| rPartbuff.clear(); |
| } |
| bReader.close(); |
| rPartbuff.clear(); |
| // probe |
| pReader.open(); |
| while (pReader.nextFrame(rPartbuff)) { |
| joiner.join(rPartbuff, writer); |
| rPartbuff.clear(); |
| } |
| pReader.close(); |
| joiner.closeJoin(writer); |
| } |
| |
| private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize, |
| RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator) |
| throws HyracksDataException { |
| |
| NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd), |
| new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, false, null); |
| |
| ByteBuffer cacheBuff = ctx.allocateFrame(); |
| innerReader.open(); |
| while (innerReader.nextFrame(cacheBuff)) { |
| FrameUtils.makeReadable(cacheBuff); |
| nlj.cache(cacheBuff); |
| cacheBuff.clear(); |
| } |
| nlj.closeCache(); |
| |
| ByteBuffer joinBuff = ctx.allocateFrame(); |
| outerReader.open(); |
| |
| while (outerReader.nextFrame(joinBuff)) { |
| FrameUtils.makeReadable(joinBuff); |
| nlj.join(joinBuff, writer); |
| joinBuff.clear(); |
| } |
| |
| nlj.closeJoin(writer); |
| outerReader.close(); |
| innerReader.close(); |
| } |
| }; |
| return op; |
| } |
| } |
| } |