blob: 91f509da70b32a6ebe1a9b6408209b26b6945b0b [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.join;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
private final IHyracksTaskContext ctx;
private final Object state0Id;
private final Object state1Id;
private final int[] keys0;
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
private final INullWriterFactory[] nullWriterFactories;
private final RecordDescriptor rd0;
private final RecordDescriptor rd1;
private final int recordsPerFrame;
private final double factor;
private final int numPartitions;
private final boolean isLeftOuter;
GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object state0Id, Object state1Id, int recordsPerFrame,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
IBinaryComparatorFactory[] comparatorFactories, INullWriterFactory[] nullWriterFactories,
RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor outRecordDescriptor, int numPartitions,
boolean isLeftOuter) {
this.ctx = ctx;
this.state0Id = state0Id;
this.state1Id = state1Id;
this.keys0 = keys0;
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
this.nullWriterFactories = nullWriterFactories;
this.rd0 = rd0;
this.rd1 = rd1;
this.numPartitions = numPartitions;
this.recordsPerFrame = recordsPerFrame;
this.factor = factor;
this.isLeftOuter = isLeftOuter;
}
@Override
public void initialize() throws HyracksDataException {
GraceHashJoinPartitionState rState = (GraceHashJoinPartitionState) ctx.getStateObject(state0Id);
GraceHashJoinPartitionState sState = (GraceHashJoinPartitionState) ctx.getStateObject(state1Id);
RunFileWriter[] buildWriters = sState.getRunWriters();
RunFileWriter[] probeWriters = rState.getRunWriters();
IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(numPartitions,
new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)).createPartitioner();
final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories.length; i++) {
nullWriters1[i] = nullWriterFactories[i].createNullWriter();
}
}
writer.open();// open for probe
try {
ByteBuffer buffer = ctx.allocateFrame();// input
// buffer
int tableSize = (int) (numPartitions * recordsPerFrame * factor);
ISerializableTable table = new SerializableHashTable(tableSize, ctx);
for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
RunFileWriter buildWriter = buildWriters[partitionid];
RunFileWriter probeWriter = probeWriters[partitionid];
if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
table.reset();
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
// build
if (buildWriter != null) {
RunFileReader buildReader = buildWriter.createReader();
buildReader.open();
while (buildReader.nextFrame(buffer)) {
ByteBuffer copyBuffer = ctx.allocateFrame();
FrameUtils.copy(buffer, copyBuffer);
joiner.build(copyBuffer);
buffer.clear();
}
buildReader.close();
}
// probe
RunFileReader probeReader = probeWriter.createReader();
probeReader.open();
while (probeReader.nextFrame(buffer)) {
joiner.join(buffer, writer);
buffer.clear();
}
probeReader.close();
joiner.closeJoin(writer);
}
} catch (Exception e) {
writer.fail();
throw new HyracksDataException(e);
} finally {
writer.close();
}
}
}