blob: bff101e2a6307f5ec2f502d8c87ff9068429f1b8 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.join;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String SMALLRELATION = "RelR";
private static final String LARGERELATION = "RelS";
private static final long serialVersionUID = 1L;
private final int[] keys0;
private final int[] keys1;
private final int inputsize0;
private final int recordsPerFrame;
private final int memsize;
private final double factor;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
this.recordsPerFrame = recordsPerFrame;
this.factor = factor;
this.keys0 = keys0;
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
recordDescriptors[0] = recordDescriptor;
}
@Override
public void contributeTaskGraph(IActivityGraphBuilder builder) {
HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
JoinActivityNode join = new JoinActivityNode();
builder.addTask(rpart);
builder.addSourceEdge(0, rpart, 0);
builder.addTask(spart);
builder.addSourceEdge(1, spart, 0);
builder.addTask(join);
builder.addBlockingEdge(rpart, spart);
builder.addBlockingEdge(spart, join);
builder.addTargetEdge(0, join, 0);
}
public int getMemorySize() {
return memsize;
}
private class HashPartitionActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
private String partitionsKey;
private int operatorInputIndex;
private int keys[];
public HashPartitionActivityNode(String partitionsKey, int keys[], int operatorInputIndex) {
this.partitionsKey = partitionsKey;
this.keys = keys;
this.operatorInputIndex = operatorInputIndex;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
final IOperatorEnvironment env, final IRecordDescriptorProvider recordDescProvider, int partition,
final int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(),
recordDescProvider.getInputRecordDescriptor(getOperatorId(), operatorInputIndex));
private final ITuplePartitionComputer hpc = new FieldHashPartitionComputerFactory(keys,
hashFunctionFactories).createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
private ByteBuffer[] outbufs;
private RunFileWriter[] fWriters;
private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
@Override
public void close() throws HyracksDataException {
for (int i = 0; i < numPartitions; i++) {
ByteBuffer head = outbufs[i];
accessor0.reset(head);
if (accessor0.getTupleCount() > 0) {
write(i, head);
}
closeWriter(i);
}
env.set(partitionsKey, fWriters);
}
private void closeWriter(int i) throws HyracksDataException {
RunFileWriter writer = fWriters[i];
if (writer != null) {
writer.close();
}
}
private void write(int i, ByteBuffer head) throws HyracksDataException {
RunFileWriter writer = fWriters[i];
if (writer == null) {
FileReference file = ctx.getJobletContext().createWorkspaceFile(partitionsKey);
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
fWriters[i] = writer;
}
writer.nextFrame(head);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor0.reset(buffer);
int tCount = accessor0.getTupleCount();
for (int i = 0; i < tCount; ++i) {
int entry = hpc.partition(accessor0, i, numPartitions);
ByteBuffer outbuf = outbufs[entry];
appender.reset(outbuf, true);
while (true) {
if (appender.append(accessor0, i)) {
break;
} else {
// buffer is full, ie. we cannot fit the tuple
// into the buffer -- write it to disk
write(entry, outbuf);
outbuf.clear();
appender.reset(outbuf, true);
}
}
}
}
@Override
public void open() throws HyracksDataException {
outbufs = new ByteBuffer[numPartitions];
fWriters = new RunFileWriter[numPartitions];
for (int i = 0; i < numPartitions; i++) {
outbufs[i] = ctx.allocateFrame();
}
}
@Override
public void flush() throws HyracksDataException {
}
};
return op;
}
@Override
public IOperatorDescriptor getOwner() {
return GraceHashJoinOperatorDescriptor.this;
}
}
private class JoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
final IOperatorEnvironment env, final IRecordDescriptorProvider recordDescProvider, int partition,
final int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
private InMemoryHashJoin joiner;
private RunFileWriter[] rWriters;
private RunFileWriter[] sWriters;
private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
@Override
public void initialize() throws HyracksDataException {
rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
sWriters = (RunFileWriter[]) env.get(LARGERELATION);
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(numPartitions,
new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)).createPartitioner();
writer.open();// open for probe
ByteBuffer buffer = ctx.allocateFrame();// input
// buffer
int tableSize = (int) (numPartitions * recordsPerFrame * factor);
for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
RunFileWriter rWriter = rWriters[partitionid];
RunFileWriter sWriter = sWriters[partitionid];
if (rWriter == null || sWriter == null) {
continue;
}
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
new FrameTuplePairComparator(keys0, keys1, comparators));
// build
RunFileReader rReader = rWriter.createReader();
rReader.open();
while (rReader.nextFrame(buffer)) {
ByteBuffer copyBuffer = ctx.allocateFrame();
FrameUtils.copy(buffer, copyBuffer);
joiner.build(copyBuffer);
buffer.clear();
}
rReader.close();
// probe
RunFileReader sReader = sWriter.createReader();
sReader.open();
while (sReader.nextFrame(buffer)) {
joiner.join(buffer, writer);
buffer.clear();
}
sReader.close();
joiner.closeJoin(writer);
}
writer.close();
}
@Override
public void deinitialize() throws HyracksDataException {
env.set(LARGERELATION, null);
env.set(SMALLRELATION, null);
}
};
return op;
}
@Override
public IOperatorDescriptor getOwner() {
return GraceHashJoinOperatorDescriptor.this;
}
}
}