blob: c143eaf30d20327562eae5fd56f44a96107812ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.dataflow.std.sort.util;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.PriorityQueue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.IntSerDeUtils;
/**
* This is a special frame which is used in TupleMemoryBuffer.
* This frame has a special structure to organize the deleted spaces.
* Specifically, the endOffset of the deleted tuple will be set as negative number.
* And we add a special <code>deleted_space</code> field at the last 4 bytes to
* remember how many bytes has been deleted.
* The offsets also store both the start and end values because tuples may be out of
* after several add, remove and reorganize operations.
* A frame is formatted with tuple data concatenated starting at offset 0,
* one tuple after another.
* Offset FS - 4 holds an int indicating the amount of <code>deleted_space</code> in the frame.
* Offset FS - 4 holds an int indicating the number of tuples (N) in the frame.
* FS - ((i + 1) * (4 + 4)) for i from 0 to N - 1 holds an two ints indicating
* the offset and length of the (i + 1)^th tuple.
* The tuple references are organized as a sequence of ints indicating the start of the field
* followed by the length of each tuple.
* The offset if the start of the frame.
* The tuple has been deleted if the length is 0.
*
* <pre>
* [ *tuple_1_bytes*,
* *tuple_2_bytes*,
* ...
* int length, int offset, # tuple 2
* int length, int offset, # tuple 1
* int tuple_append,
* int next_index,
* int deleted_space,
* int index_count,
* ]
* </pre>
*
* <pre>
* [ *tuple_1_bytes*,
* *tuple_2_bytes*,
* ...
* int end_offset,
* int offset, # tuple 2
* int offset, # tuple 1
* int deleted_space,
* int tuple_count,
* ]
* </pre>
*/
public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAccessor {
public static final Comparator<Pair<Integer, Integer>> INDEX_OFFSET_ASC_COMPARATOR = new Comparator<Pair<Integer, Integer>>() {
@Override
public int compare(Pair<Integer, Integer> p1, Pair<Integer, Integer> p2) {
return p1.getValue() - p2.getValue();
}
};
private static final int SIZE_INDEX_COUNT = 4;
private static final int SIZE_DELETED_SPACE = 4;
private static final int SIZE_NEXT_INDEX = 4;
private static final int SIZE_TUPLE_APPEND = 4;
private static final int SIZE_START_OFFSET = 4;
private static final int SIZE_END_OFFSET = 4;
private static final int SIZE_OFFSET_GROUP = SIZE_END_OFFSET + SIZE_START_OFFSET;
private final RecordDescriptor recordDescriptor;
private ByteBuffer buffer;
private int indexSlotsOffset;
private int indexCount;
private int tupleAppend;
private int deletedSpace;
private int nextIndex;
private byte[] array; // to speed up the array visit a little
private IntegerPairPool ipp = new IntegerPairPool();
private final PriorityQueue<IntegerPair> reorganizeQueue;
public DeletableFrameTupleAppender(RecordDescriptor recordDescriptor) {
this.recordDescriptor = recordDescriptor;
reorganizeQueue = new PriorityQueue<>(16, IntegerPair.RIGHT_ASC_COMPARATOR);
}
private int getIndexCount() {
return IntSerDeUtils.getInt(array, getIndexCountOffset());
}
private void setIndexCount(int count) {
IntSerDeUtils.putInt(array, getIndexCountOffset(), count);
}
private int getIndexCountOffset() {
return buffer.capacity() - SIZE_INDEX_COUNT;
}
private int getDeletedSpace() {
return IntSerDeUtils.getInt(array, getDeletedSpaceOffset());
}
private void setDeletedSpace(int space) {
IntSerDeUtils.putInt(array, getDeletedSpaceOffset(), space);
}
private int getDeletedSpaceOffset() {
return getIndexCountOffset() - SIZE_DELETED_SPACE;
}
private int getNextIndex() {
return IntSerDeUtils.getInt(array, getNextIndexOffset());
}
private void setNextIndex(int index) {
IntSerDeUtils.putInt(array, getNextIndexOffset(), index);
}
private int getNextIndexOffset() {
return getDeletedSpaceOffset() - SIZE_NEXT_INDEX;
}
private int getAndUpdateNextIndex() {
int index = nextIndex;
nextIndex = index + 1;
while (nextIndex < indexCount) {
if (getTupleEndOffset(nextIndex) <= 0) {
break;
}
nextIndex++;
}
setNextIndex(nextIndex);
return index;
}
private int getTupleAppend() {
return IntSerDeUtils.getInt(array, getTupleAppendOffset());
}
private void setTupleAppend(int offset) {
IntSerDeUtils.putInt(array, getTupleAppendOffset(), offset);
}
private int getTupleAppendOffset() {
return getNextIndexOffset() - SIZE_TUPLE_APPEND;
}
private int getIndexSlotOffset() {
return getTupleAppendOffset();
}
@Override
public void clear(ByteBuffer buffer) throws HyracksDataException {
this.buffer = buffer;
this.array = buffer.array();
setIndexCount(0);
setDeletedSpace(0);
setNextIndex(0);
setTupleAppend(0);
resetCounts();
}
@Override
public void reset(ByteBuffer buffer) {
this.buffer = buffer;
this.array = buffer.array();
resetCounts();
}
private void resetCounts() {
indexSlotsOffset = getIndexSlotOffset();
deletedSpace = getDeletedSpace();
indexCount = getIndexCount();
tupleAppend = getTupleAppend();
nextIndex = getNextIndex();
}
/**
* Append the record into the frame. This method will not validate the space, please make sure space is enough
* by calling {@link #getContiguousFreeSpace()}
*
* @param tupleAccessor
* @param tIndex
* @return
* @throws HyracksDataException
*/
@Override
public int append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
byte[] src = tupleAccessor.getBuffer().array();
int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
int length = tupleAccessor.getTupleLength(tIndex);
System.arraycopy(src, tStartOffset, array, tupleAppend, length);
int index = getAndUpdateNextIndex();
if (index < indexCount) {
// Don't change index count
} else {
// Increment count
setIndexCount(++indexCount);
}
setTupleOffsets(index, tupleAppend, length);
tupleAppend += length;
setTupleAppend(tupleAppend);
return index;
}
@Override
public void delete(int tupleIndex) {
int endOffset = getTupleEndOffset(tupleIndex);
if (endOffset > 0) {
setTupleEndOffset(tupleIndex, -endOffset);
deletedSpace += endOffset - getTupleStartOffset(tupleIndex);
setDeletedSpace(deletedSpace);
if (nextIndex > tupleIndex) {
nextIndex = tupleIndex;
setNextIndex(nextIndex);
}
}
}
@Override
public void reOrganizeBuffer() {
if (deletedSpace <= 0) {
return;
}
reclaimDeletedEnding();
// Build reorganize queue
IntegerPair ip;
int endOffset;
int startOffset;
for (int i = 0; i < indexCount; i++) {
endOffset = getTupleEndOffset(i);
if (endOffset > 0) {
ip = ipp.takeOne();
ip.reset(i, getTupleStartOffset(i));
reorganizeQueue.add(ip);
}
}
int index;
tupleAppend = 0;
while (!reorganizeQueue.isEmpty()) {
ip = reorganizeQueue.remove();
index = ip.getLeft();
startOffset = getTupleStartOffset(index);
endOffset = getTupleEndOffset(index);
if (endOffset >= 0) {
int length = endOffset - startOffset;
assert length >= 0;
if (tupleAppend != startOffset) {
System.arraycopy(array, startOffset, array, tupleAppend, length);
}
setTupleOffsets(index, tupleAppend, length);
tupleAppend += length;
}
ipp.giveBack(ip);
}
setTupleAppend(tupleAppend);
deletedSpace = 0;
setDeletedSpace(0);
// Clean up
reorganizeQueue.clear();
}
private void reclaimDeletedEnding() {
for (int i = indexCount - 1; i >= 0; i--) {
int endOffset = getTupleEndOffset(i);
if (endOffset <= 0) {
indexCount--;
} else {
break;
}
}
setIndexCount(indexCount);
if (nextIndex > indexCount) {
setNextIndex(indexCount);
}
}
@Override
public int getTotalFreeSpace() {
return getContiguousFreeSpace() + deletedSpace;
}
@Override
public int getContiguousFreeSpace() {
int slotSpace = indexCount * SIZE_OFFSET_GROUP;
return indexSlotsOffset - tupleAppend - slotSpace;
}
@Override
public int getFieldCount() {
return recordDescriptor.getFieldCount();
}
@Override
public int getFieldSlotsLength() {
return recordDescriptor.getFieldCount() * 4;
}
@Override
public int getFieldEndOffset(int tupleIndex, int fIdx) {
return IntSerDeUtils.getInt(array, getTupleStartOffset(tupleIndex) + fIdx * 4);
}
@Override
public int getFieldStartOffset(int tupleIndex, int fIdx) {
return fIdx == 0 ? 0 : IntSerDeUtils.getInt(array, getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
}
@Override
public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
return getTupleStartOffset(tupleIndex) + getFieldSlotsLength() + getFieldStartOffset(tupleIndex, fIdx);
}
@Override
public int getFieldLength(int tupleIndex, int fIdx) {
return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
}
@Override
public int getTupleLength(int tupleIndex) {
int endOffset = getTupleEndOffset(tupleIndex);
if (endOffset > 0) {
return endOffset - getTupleStartOffset(tupleIndex);
} else {
return endOffset + getTupleStartOffset(tupleIndex);
}
}
private void setTupleOffsets(int tupleIndex, int start, int length) {
setTupleStartOffset(tupleIndex, start);
setTupleEndOffset(tupleIndex, start + length);
}
@Override
public int getTupleEndOffset(int tupleIndex) {
return IntSerDeUtils.getInt(array, getTupleEndSlotOffset(tupleIndex));
}
private void setTupleEndOffset(int tupleIndex, int offset) {
IntSerDeUtils.putInt(array, getTupleEndSlotOffset(tupleIndex), offset);
}
@Override
public int getTupleStartOffset(int tupleIndex) {
return IntSerDeUtils.getInt(array, getTupleStartSlotOffset(tupleIndex));
}
public void setTupleStartOffset(int tupleIndex, int offset) {
IntSerDeUtils.putInt(array, getTupleStartSlotOffset(tupleIndex), offset);
}
public int getTupleStartSlotOffset(int tupleIndex) {
return indexSlotsOffset - SIZE_OFFSET_GROUP * tupleIndex - SIZE_START_OFFSET;
}
public int getTupleEndSlotOffset(int tupleIndex) {
return getTupleStartSlotOffset(tupleIndex) - SIZE_END_OFFSET;
}
@Override
public int getTupleCount() {
return indexCount;
}
private int getLiveTupleCount() {
int live = 0;
for (int i = 0; i < indexCount; ++i) {
int endOffset = getTupleEndOffset(i);
if (endOffset > 0) {
live++;
}
}
return live;
}
@Override
public ByteBuffer getBuffer() {
return buffer;
}
@Override
public void printStats(PrintStream ps) {
if (getLiveTupleCount() == 0) {
ps.print("");
}
ps.printf("(%d, %d)", getLiveTupleCount(), getIndexCount());
}
}