blob: d1d528cf912f81bde0ade68ce458d6f47cfe4a34 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.join;
import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
public class InMemoryHashJoin {
private final Link[] table;
private final List<ByteBuffer> buffers;
private final FrameTupleAccessor accessorBuild;
private final ITuplePartitionComputer tpcBuild;
private final FrameTupleAccessor accessorProbe;
private final ITuplePartitionComputer tpcProbe;
private final FrameTupleAppender appender;
private final FrameTuplePairComparator tpComparator;
private final ByteBuffer outBuffer;
private final boolean isLeftOuter;
private final ArrayTupleBuilder nullTupleBuild;
public InMemoryHashJoin(IHyracksStageletContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
throws HyracksDataException {
table = new Link[tableSize];
buffers = new ArrayList<ByteBuffer>();
this.accessorBuild = accessor1;
this.tpcBuild = tpc1;
this.accessorProbe = accessor0;
this.tpcProbe = tpc0;
appender = new FrameTupleAppender(ctx.getFrameSize());
tpComparator = comparator;
outBuffer = ctx.allocateFrame();
appender.reset(outBuffer, true);
this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
int fieldCountOuter = accessor1.getFieldCount();
nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
DataOutput out = nullTupleBuild.getDataOutput();
for (int i = 0; i < fieldCountOuter; i++) {
nullWriters1[i].writeNull(out);
nullTupleBuild.addFieldEndOffset();
}
} else {
nullTupleBuild = null;
}
}
public void build(ByteBuffer buffer) throws HyracksDataException {
buffers.add(buffer);
int bIndex = buffers.size() - 1;
accessorBuild.reset(buffer);
int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
int entry = tpcBuild.partition(accessorBuild, i, table.length);
long tPointer = (((long) bIndex) << 32) + i;
Link link = table[entry];
if (link == null) {
link = table[entry] = new Link();
}
link.add(tPointer);
}
}
public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
accessorProbe.reset(buffer);
int tupleCount0 = accessorProbe.getTupleCount();
for (int i = 0; i < tupleCount0; ++i) {
int entry = tpcProbe.partition(accessorProbe, i, table.length);
Link link = table[entry];
boolean matchFound = false;
if (link != null) {
for (int j = 0; j < link.size; ++j) {
long pointer = link.pointers[j];
int bIndex = (int) ((pointer >> 32) & 0xffffffff);
int tIndex = (int) (pointer & 0xffffffff);
accessorBuild.reset(buffers.get(bIndex));
int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
matchFound = true;
if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
throw new IllegalStateException();
}
}
}
}
}
if (!matchFound && isLeftOuter) {
if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
throw new IllegalStateException();
}
}
}
}
}
public void closeJoin(IFrameWriter writer) throws HyracksDataException {
if (appender.getTupleCount() > 0) {
flushFrame(outBuffer, writer);
}
}
private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
buffer.position(0);
buffer.limit(buffer.capacity());
writer.nextFrame(buffer);
buffer.position(0);
buffer.limit(buffer.capacity());
}
private static class Link {
private static final int INIT_POINTERS_SIZE = 8;
long[] pointers;
int size;
Link() {
pointers = new long[INIT_POINTERS_SIZE];
size = 0;
}
void add(long pointer) {
if (size >= pointers.length) {
pointers = Arrays.copyOf(pointers, pointers.length * 2);
}
pointers[size++] = pointer;
}
}
}