blob: 0acafeeda5ce04fdf8a7769e5b9fae8688fba6c1 [file] [log] [blame]
package edu.uci.ics.hyracks.dataflow.std.structures;
import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
* An entry in the table is: #elements, #no-empty elements; fIndex, tIndex;
* fIndex, tIndex; .... <fIndex, tIndex> forms a tuple pointer
*/
public class SerializableHashTable implements ISerializableTable {
private static final int INT_SIZE = 4;
private static final int INIT_ENTRY_SIZE = 4;
private IntSerDeBuffer[] headers;
private List<IntSerDeBuffer> contents = new ArrayList<IntSerDeBuffer>();
private List<Integer> frameCurrentIndex = new ArrayList<Integer>();
private final IHyracksTaskContext ctx;
private int frameCapacity = 0;
private int currentLargestFrameIndex = 0;
private int tupleCount = 0;
private int headerFrameCount = 0;
private TuplePointer tempTuplePointer = new TuplePointer();
public SerializableHashTable(int tableSize, final IHyracksTaskContext ctx) {
this.ctx = ctx;
int frameSize = ctx.getFrameSize();
int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual;
headers = new IntSerDeBuffer[headerSize];
IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array());
contents.add(frame);
frameCurrentIndex.add(0);
frameCapacity = frame.capacity();
}
@Override
public void insert(int entry, TuplePointer pointer) {
int hFrameIndex = getHeaderFrameIndex(entry);
int headerOffset = getHeaderFrameOffset(entry);
IntSerDeBuffer header = headers[hFrameIndex];
if (header == null) {
header = new IntSerDeBuffer(ctx.allocateFrame().array());
headers[hFrameIndex] = header;
resetFrame(header);
headerFrameCount++;
}
int frameIndex = header.getInt(headerOffset);
int offsetIndex = header.getInt(headerOffset + 1);
if (frameIndex < 0) {
// insert first tuple into the entry
insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer);
} else {
// insert non-first tuple into the entry
insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer);
}
tupleCount++;
}
@Override
public void getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
int hFrameIndex = getHeaderFrameIndex(entry);
int headerOffset = getHeaderFrameOffset(entry);
IntSerDeBuffer header = headers[hFrameIndex];
if (header == null) {
dataPointer.frameIndex = -1;
dataPointer.tupleIndex = -1;
return;
}
int frameIndex = header.getInt(headerOffset);
int offsetIndex = header.getInt(headerOffset + 1);
if (frameIndex < 0) {
dataPointer.frameIndex = -1;
dataPointer.tupleIndex = -1;
return;
}
IntSerDeBuffer frame = contents.get(frameIndex);
int entryUsedItems = frame.getInt(offsetIndex + 1);
if (offset > entryUsedItems - 1) {
dataPointer.frameIndex = -1;
dataPointer.tupleIndex = -1;
return;
}
int startIndex = offsetIndex + 2 + offset * 2;
while (startIndex >= frameCapacity) {
++frameIndex;
startIndex -= frameCapacity;
}
frame = contents.get(frameIndex);
dataPointer.frameIndex = frame.getInt(startIndex);
dataPointer.tupleIndex = frame.getInt(startIndex + 1);
}
@Override
public void reset() {
for (IntSerDeBuffer frame : headers)
if (frame != null)
resetFrame(frame);
frameCurrentIndex.clear();
for (int i = 0; i < contents.size(); i++) {
frameCurrentIndex.add(0);
}
currentLargestFrameIndex = 0;
tupleCount = 0;
}
@Override
public int getFrameCount() {
return headerFrameCount + contents.size();
}
public int getTupleCount() {
return tupleCount;
}
@Override
public void close() {
for (int i = 0; i < headers.length; i++)
headers[i] = null;
contents.clear();
frameCurrentIndex.clear();
tupleCount = 0;
currentLargestFrameIndex = 0;
}
private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer) {
IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex);
int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex);
int requiredIntCapacity = entryCapacity * 2;
int startFrameIndex = currentLargestFrameIndex;
if (lastIndex + requiredIntCapacity >= frameCapacity) {
IntSerDeBuffer newFrame;
startFrameIndex++;
do {
if (currentLargestFrameIndex >= contents.size() - 1) {
newFrame = new IntSerDeBuffer(ctx.allocateFrame().array());
currentLargestFrameIndex++;
contents.add(newFrame);
frameCurrentIndex.add(0);
} else {
currentLargestFrameIndex++;
frameCurrentIndex.set(currentLargestFrameIndex, 0);
}
requiredIntCapacity -= frameCapacity;
} while (requiredIntCapacity > 0);
lastIndex = 0;
lastFrame = contents.get(startFrameIndex);
}
// set header
header.writeInt(headerOffset, startFrameIndex);
header.writeInt(headerOffset + 1, lastIndex);
// set the entry
lastFrame.writeInt(lastIndex, entryCapacity - 1);
lastFrame.writeInt(lastIndex + 1, 1);
lastFrame.writeInt(lastIndex + 2, pointer.frameIndex);
lastFrame.writeInt(lastIndex + 3, pointer.tupleIndex);
int newLastIndex = lastIndex + entryCapacity * 2;
newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1;
frameCurrentIndex.set(startFrameIndex, newLastIndex);
requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex);
while (requiredIntCapacity > 0) {
startFrameIndex++;
requiredIntCapacity -= frameCapacity;
newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1;
frameCurrentIndex.set(startFrameIndex, newLastIndex);
}
}
private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndex, int offsetIndex,
TuplePointer pointer) {
IntSerDeBuffer frame = contents.get(frameIndex);
int entryItems = frame.getInt(offsetIndex);
int entryUsedItems = frame.getInt(offsetIndex + 1);
if (entryUsedItems < entryItems) {
frame.writeInt(offsetIndex + 1, entryUsedItems + 1);
int startIndex = offsetIndex + 2 + entryUsedItems * 2;
while (startIndex >= frameCapacity) {
++frameIndex;
startIndex -= frameCapacity;
}
frame = contents.get(frameIndex);
frame.writeInt(startIndex, pointer.frameIndex);
frame.writeInt(startIndex + 1, pointer.tupleIndex);
} else {
int capacity = (entryItems + 1) * 2;
header.writeInt(headerOffset, -1);
header.writeInt(headerOffset + 1, -1);
int fIndex = frame.getInt(offsetIndex + 2);
int tIndex = frame.getInt(offsetIndex + 3);
tempTuplePointer.frameIndex = fIndex;
tempTuplePointer.tupleIndex = tIndex;
this.insertNewEntry(header, headerOffset, capacity, tempTuplePointer);
int newFrameIndex = header.getInt(headerOffset);
int newTupleIndex = header.getInt(headerOffset + 1);
for (int i = 1; i < entryUsedItems; i++) {
int startIndex = offsetIndex + 2 + i * 2;
int startFrameIndex = frameIndex;
while (startIndex >= frameCapacity) {
++startFrameIndex;
startIndex -= frameCapacity;
}
frame = contents.get(startFrameIndex);
fIndex = frame.getInt(startIndex);
tIndex = frame.getInt(startIndex + 1);
tempTuplePointer.frameIndex = fIndex;
tempTuplePointer.tupleIndex = tIndex;
insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer);
}
insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer);
}
}
private void resetFrame(IntSerDeBuffer frame) {
for (int i = 0; i < frameCapacity; i++)
frame.writeInt(i, -1);
}
private int getHeaderFrameIndex(int entry) {
int frameIndex = entry * 2 / frameCapacity;
return frameIndex;
}
private int getHeaderFrameOffset(int entry) {
int offset = entry * 2 % frameCapacity;
return offset;
}
}
class IntSerDeBuffer {
private byte[] bytes;
public IntSerDeBuffer(byte[] data) {
this.bytes = data;
}
public int getInt(int pos) {
int offset = pos * 4;
return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+ ((bytes[offset + 3] & 0xff) << 0);
}
public void writeInt(int pos, int value) {
int offset = pos * 4;
bytes[offset++] = (byte) (value >> 24);
bytes[offset++] = (byte) (value >> 16);
bytes[offset++] = (byte) (value >> 8);
bytes[offset++] = (byte) (value);
}
public int capacity() {
return bytes.length / 4;
}
}