| package edu.uci.ics.hyracks.dataflow.std.structures; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; |
| |
| /** |
| * An entry in the table is: #elements, #no-empty elements; fIndex, tIndex; |
| * fIndex, tIndex; .... <fIndex, tIndex> forms a tuple pointer |
| */ |
| public class SerializableHashTable implements ISerializableTable { |
| |
| private static final int INT_SIZE = 4; |
| private static final int INIT_ENTRY_SIZE = 4; |
| |
| private IntSerDeBuffer[] headers; |
| private List<IntSerDeBuffer> contents = new ArrayList<IntSerDeBuffer>(); |
| private List<Integer> frameCurrentIndex = new ArrayList<Integer>(); |
| private final IHyracksTaskContext ctx; |
| private int frameCapacity = 0; |
| private int currentLargestFrameIndex = 0; |
| private int tupleCount = 0; |
| private int headerFrameCount = 0; |
| private TuplePointer tempTuplePointer = new TuplePointer(); |
| |
| public SerializableHashTable(int tableSize, final IHyracksTaskContext ctx) { |
| this.ctx = ctx; |
| int frameSize = ctx.getFrameSize(); |
| |
| int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1; |
| int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual; |
| headers = new IntSerDeBuffer[headerSize]; |
| |
| IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array()); |
| contents.add(frame); |
| frameCurrentIndex.add(0); |
| frameCapacity = frame.capacity(); |
| } |
| |
| @Override |
| public void insert(int entry, TuplePointer pointer) { |
| int hFrameIndex = getHeaderFrameIndex(entry); |
| int headerOffset = getHeaderFrameOffset(entry); |
| IntSerDeBuffer header = headers[hFrameIndex]; |
| if (header == null) { |
| header = new IntSerDeBuffer(ctx.allocateFrame().array()); |
| headers[hFrameIndex] = header; |
| resetFrame(header); |
| headerFrameCount++; |
| } |
| int frameIndex = header.getInt(headerOffset); |
| int offsetIndex = header.getInt(headerOffset + 1); |
| if (frameIndex < 0) { |
| // insert first tuple into the entry |
| insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer); |
| } else { |
| // insert non-first tuple into the entry |
| insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer); |
| } |
| tupleCount++; |
| } |
| |
| @Override |
| public void getTuplePointer(int entry, int offset, TuplePointer dataPointer) { |
| int hFrameIndex = getHeaderFrameIndex(entry); |
| int headerOffset = getHeaderFrameOffset(entry); |
| IntSerDeBuffer header = headers[hFrameIndex]; |
| if (header == null) { |
| dataPointer.frameIndex = -1; |
| dataPointer.tupleIndex = -1; |
| return; |
| } |
| int frameIndex = header.getInt(headerOffset); |
| int offsetIndex = header.getInt(headerOffset + 1); |
| if (frameIndex < 0) { |
| dataPointer.frameIndex = -1; |
| dataPointer.tupleIndex = -1; |
| return; |
| } |
| IntSerDeBuffer frame = contents.get(frameIndex); |
| int entryUsedItems = frame.getInt(offsetIndex + 1); |
| if (offset > entryUsedItems - 1) { |
| dataPointer.frameIndex = -1; |
| dataPointer.tupleIndex = -1; |
| return; |
| } |
| int startIndex = offsetIndex + 2 + offset * 2; |
| while (startIndex >= frameCapacity) { |
| ++frameIndex; |
| startIndex -= frameCapacity; |
| } |
| frame = contents.get(frameIndex); |
| dataPointer.frameIndex = frame.getInt(startIndex); |
| dataPointer.tupleIndex = frame.getInt(startIndex + 1); |
| } |
| |
| @Override |
| public void reset() { |
| for (IntSerDeBuffer frame : headers) |
| if (frame != null) |
| resetFrame(frame); |
| |
| frameCurrentIndex.clear(); |
| for (int i = 0; i < contents.size(); i++) { |
| frameCurrentIndex.add(0); |
| } |
| |
| currentLargestFrameIndex = 0; |
| tupleCount = 0; |
| } |
| |
| @Override |
| public int getFrameCount() { |
| return headerFrameCount + contents.size(); |
| } |
| |
| public int getTupleCount() { |
| return tupleCount; |
| } |
| |
| @Override |
| public void close() { |
| for (int i = 0; i < headers.length; i++) |
| headers[i] = null; |
| contents.clear(); |
| frameCurrentIndex.clear(); |
| tupleCount = 0; |
| currentLargestFrameIndex = 0; |
| } |
| |
| private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer) { |
| IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex); |
| int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex); |
| int requiredIntCapacity = entryCapacity * 2; |
| int startFrameIndex = currentLargestFrameIndex; |
| |
| if (lastIndex + requiredIntCapacity >= frameCapacity) { |
| IntSerDeBuffer newFrame; |
| startFrameIndex++; |
| do { |
| if (currentLargestFrameIndex >= contents.size() - 1) { |
| newFrame = new IntSerDeBuffer(ctx.allocateFrame().array()); |
| currentLargestFrameIndex++; |
| contents.add(newFrame); |
| frameCurrentIndex.add(0); |
| } else { |
| currentLargestFrameIndex++; |
| frameCurrentIndex.set(currentLargestFrameIndex, 0); |
| } |
| requiredIntCapacity -= frameCapacity; |
| } while (requiredIntCapacity > 0); |
| lastIndex = 0; |
| lastFrame = contents.get(startFrameIndex); |
| } |
| |
| // set header |
| header.writeInt(headerOffset, startFrameIndex); |
| header.writeInt(headerOffset + 1, lastIndex); |
| |
| // set the entry |
| lastFrame.writeInt(lastIndex, entryCapacity - 1); |
| lastFrame.writeInt(lastIndex + 1, 1); |
| lastFrame.writeInt(lastIndex + 2, pointer.frameIndex); |
| lastFrame.writeInt(lastIndex + 3, pointer.tupleIndex); |
| int newLastIndex = lastIndex + entryCapacity * 2; |
| newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1; |
| frameCurrentIndex.set(startFrameIndex, newLastIndex); |
| |
| requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex); |
| while (requiredIntCapacity > 0) { |
| startFrameIndex++; |
| requiredIntCapacity -= frameCapacity; |
| newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1; |
| frameCurrentIndex.set(startFrameIndex, newLastIndex); |
| } |
| } |
| |
| private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndex, int offsetIndex, |
| TuplePointer pointer) { |
| IntSerDeBuffer frame = contents.get(frameIndex); |
| int entryItems = frame.getInt(offsetIndex); |
| int entryUsedItems = frame.getInt(offsetIndex + 1); |
| |
| if (entryUsedItems < entryItems) { |
| frame.writeInt(offsetIndex + 1, entryUsedItems + 1); |
| int startIndex = offsetIndex + 2 + entryUsedItems * 2; |
| while (startIndex >= frameCapacity) { |
| ++frameIndex; |
| startIndex -= frameCapacity; |
| } |
| frame = contents.get(frameIndex); |
| frame.writeInt(startIndex, pointer.frameIndex); |
| frame.writeInt(startIndex + 1, pointer.tupleIndex); |
| } else { |
| int capacity = (entryItems + 1) * 2; |
| header.writeInt(headerOffset, -1); |
| header.writeInt(headerOffset + 1, -1); |
| int fIndex = frame.getInt(offsetIndex + 2); |
| int tIndex = frame.getInt(offsetIndex + 3); |
| tempTuplePointer.frameIndex = fIndex; |
| tempTuplePointer.tupleIndex = tIndex; |
| this.insertNewEntry(header, headerOffset, capacity, tempTuplePointer); |
| int newFrameIndex = header.getInt(headerOffset); |
| int newTupleIndex = header.getInt(headerOffset + 1); |
| |
| for (int i = 1; i < entryUsedItems; i++) { |
| int startIndex = offsetIndex + 2 + i * 2; |
| int startFrameIndex = frameIndex; |
| while (startIndex >= frameCapacity) { |
| ++startFrameIndex; |
| startIndex -= frameCapacity; |
| } |
| frame = contents.get(startFrameIndex); |
| fIndex = frame.getInt(startIndex); |
| tIndex = frame.getInt(startIndex + 1); |
| tempTuplePointer.frameIndex = fIndex; |
| tempTuplePointer.tupleIndex = tIndex; |
| insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer); |
| } |
| insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer); |
| } |
| } |
| |
| private void resetFrame(IntSerDeBuffer frame) { |
| for (int i = 0; i < frameCapacity; i++) |
| frame.writeInt(i, -1); |
| } |
| |
| private int getHeaderFrameIndex(int entry) { |
| int frameIndex = entry * 2 / frameCapacity; |
| return frameIndex; |
| } |
| |
| private int getHeaderFrameOffset(int entry) { |
| int offset = entry * 2 % frameCapacity; |
| return offset; |
| } |
| |
| } |
| |
| class IntSerDeBuffer { |
| |
| private byte[] bytes; |
| |
| public IntSerDeBuffer(byte[] data) { |
| this.bytes = data; |
| } |
| |
| public int getInt(int pos) { |
| int offset = pos * 4; |
| return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8) |
| + ((bytes[offset + 3] & 0xff) << 0); |
| } |
| |
| public void writeInt(int pos, int value) { |
| int offset = pos * 4; |
| bytes[offset++] = (byte) (value >> 24); |
| bytes[offset++] = (byte) (value >> 16); |
| bytes[offset++] = (byte) (value >> 8); |
| bytes[offset++] = (byte) (value); |
| } |
| |
| public int capacity() { |
| return bytes.length / 4; |
| } |
| } |