blob: a0940612af705a94e4e669f91eb7aba0e43275ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.dataflow.std.buffermanager;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hyracks.api.comm.FixedSizeFrame;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FixedSizeFrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
/**
* This buffer manager will divide the buffers into given number of partitions.
* The cleared partition (spilled one in the caller side) can only get no more than one frame.
*/
public class VPartitionTupleBufferManager implements IPartitionedTupleBufferManager {
public static final IPartitionedMemoryConstrain NO_CONSTRAIN = new IPartitionedMemoryConstrain() {
@Override
public int frameLimit(int partitionId) {
return Integer.MAX_VALUE;
}
};
protected IDeallocatableFramePool framePool;
protected IFrameBufferManager[] partitionArray;
protected int[] numTuples;
private final FixedSizeFrame appendFrame;
private final FixedSizeFrameTupleAppender appender;
protected BufferInfo tempInfo;
protected final IPartitionedMemoryConstrain constrain;
public VPartitionTupleBufferManager(IHyracksFrameMgrContext ctx, IPartitionedMemoryConstrain constrain,
int partitions, int frameLimitInBytes) throws HyracksDataException {
this.constrain = constrain;
this.framePool = new DeallocatableFramePool(ctx, frameLimitInBytes);
this.partitionArray = new IFrameBufferManager[partitions];
this.numTuples = new int[partitions];
this.appendFrame = new FixedSizeFrame();
this.appender = new FixedSizeFrameTupleAppender();
this.tempInfo = new BufferInfo(null, -1, -1);
}
@Override
public void reset() throws HyracksDataException {
for (IFrameBufferManager part : partitionArray) {
if (part != null) {
for (int i = 0; i < part.getNumFrames(); i++) {
framePool.deAllocateBuffer(part.getFrame(i, tempInfo).getBuffer());
}
part.reset();
}
}
Arrays.fill(numTuples, 0);
appendFrame.reset(null);
}
@Override
public int getNumPartitions() {
return partitionArray.length;
}
@Override
public int getNumTuples(int partition) {
return numTuples[partition];
}
@Override
public int getNumFrames(int partition) {
return partitionArray[partition].getNumFrames();
}
@Override
public int getPhysicalSize(int partitionId) {
int size = 0;
IFrameBufferManager partition = partitionArray[partitionId];
if (partition != null) {
for (int i = 0; i < partition.getNumFrames(); ++i) {
size += partition.getFrame(i, tempInfo).getLength();
}
}
return size;
}
@Override
public void clearPartition(int partitionId) throws HyracksDataException {
IFrameBufferManager partition = partitionArray[partitionId];
if (partition != null) {
partition.resetIterator();
int i = partition.next();
while (partition.exists()) {
framePool.deAllocateBuffer(partition.getFrame(i, tempInfo).getBuffer());
i = partition.next();
}
partition.reset();
}
numTuples[partitionId] = 0;
}
@Override
public boolean insertTuple(int partition, byte[] byteArray, int[] fieldEndOffsets, int start, int size,
TuplePointer pointer) throws HyracksDataException {
int actualSize = calculateActualSize(fieldEndOffsets, size);
int fid = getLastBufferOrCreateNewIfNotExist(partition, actualSize);
if (fid < 0) {
return false;
}
partitionArray[partition].getFrame(fid, tempInfo);
int tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
if (tid < 0) {
if (partitionArray[partition].getNumFrames() >= constrain.frameLimit(partition)) {
return false;
}
fid = createNewBuffer(partition, actualSize);
if (fid < 0) {
return false;
}
partitionArray[partition].getFrame(fid, tempInfo);
tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
}
pointer.reset(makeGroupFrameId(partition, fid), tid);
numTuples[partition]++;
return true;
}
@Override
public boolean insertTuple(int partition, IFrameTupleAccessor tupleAccessor, int tupleId, TuplePointer pointer)
throws HyracksDataException {
return insertTuple(partition, tupleAccessor.getBuffer().array(), null,
tupleAccessor.getTupleStartOffset(tupleId), tupleAccessor.getTupleLength(tupleId), pointer);
}
protected static int calculateActualSize(int[] fieldEndOffsets, int size) {
if (fieldEndOffsets != null) {
return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
}
return FrameHelper.calcRequiredSpace(0, size);
}
protected int makeGroupFrameId(int partition, int fid) {
return fid * getNumPartitions() + partition;
}
protected int parsePartitionId(int externalFrameId) {
return externalFrameId % getNumPartitions();
}
protected int parseFrameIdInPartition(int externalFrameId) {
return externalFrameId / getNumPartitions();
}
protected int createNewBuffer(int partition, int size) throws HyracksDataException {
ByteBuffer newBuffer = requestNewBufferFromPool(size);
if (newBuffer == null) {
return -1;
}
appendFrame.reset(newBuffer);
appender.reset(appendFrame, true);
return partitionArray[partition].insertFrame(newBuffer);
}
private ByteBuffer requestNewBufferFromPool(int recordSize) throws HyracksDataException {
int frameSize = FrameHelper.calcAlignedFrameSizeToStore(0, recordSize, framePool.getMinFrameSize());
return framePool.allocateFrame(frameSize);
}
protected int appendTupleToBuffer(BufferInfo bufferInfo, int[] fieldEndOffsets, byte[] byteArray, int start,
int size) throws HyracksDataException {
assert (bufferInfo.getStartOffset() == 0) : "Haven't supported yet in FrameTupleAppender";
if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
appendFrame.reset(bufferInfo.getBuffer());
appender.reset(appendFrame, false);
}
if (fieldEndOffsets == null) {
if (appender.append(byteArray, start, size)) {
return appender.getTupleCount() - 1;
}
} else {
if (appender.append(fieldEndOffsets, byteArray, start, size)) {
return appender.getTupleCount() - 1;
}
}
return -1;
}
protected int getLastBufferOrCreateNewIfNotExist(int partition, int actualSize) throws HyracksDataException {
if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) {
partitionArray[partition] = new PartitionFrameBufferManager();
return createNewBuffer(partition, actualSize);
}
return partitionArray[partition].getNumFrames() - 1;
}
@Override
public void close() {
framePool.close();
Arrays.fill(partitionArray, null);
}
static class PartitionFrameBufferManager implements IFrameBufferManager {
int size = 0;
ArrayList<ByteBuffer> buffers = new ArrayList<>();
@Override
public void reset() throws HyracksDataException {
buffers.clear();
size = 0;
}
@Override
public BufferInfo getFrame(int frameIndex, BufferInfo returnedInfo) {
returnedInfo.reset(buffers.get(frameIndex), 0, buffers.get(frameIndex).capacity());
return returnedInfo;
}
@Override
public int getNumFrames() {
return size;
}
@Override
public int insertFrame(ByteBuffer frame) throws HyracksDataException {
int index = -1;
if (buffers.size() == size) {
buffers.add(frame);
index = buffers.size() - 1;
} else {
for (int i = 0; i < buffers.size(); ++i) {
if (buffers.get(i) == null) {
buffers.set(i, frame);
index = i;
break;
}
}
}
if (index == -1) {
throw new HyracksDataException("Did not insert frame.");
}
size++;
return index;
}
@Override
public void removeFrame(int frameIndex) {
buffers.set(frameIndex, null);
size--;
}
@Override
public void close() {
buffers = null;
}
int iterator = -1;
@Override
public int next() {
while (++iterator < buffers.size()) {
if (buffers.get(iterator) != null) {
break;
}
}
return iterator;
}
@Override
public boolean exists() {
return iterator < buffers.size() && buffers.get(iterator) != null;
}
@Override
public void resetIterator() {
iterator = -1;
}
}
@Override
public ITuplePointerAccessor getTuplePointerAccessor(final RecordDescriptor recordDescriptor) {
return new AbstractTuplePointerAccessor() {
FrameTupleAccessor innerAccessor = new FrameTupleAccessor(recordDescriptor);
@Override
IFrameTupleAccessor getInnerAccessor() {
return innerAccessor;
}
@Override
void resetInnerAccessor(TuplePointer tuplePointer) {
partitionArray[parsePartitionId(tuplePointer.getFrameIndex())]
.getFrame(parseFrameIdInPartition(tuplePointer.getFrameIndex()), tempInfo);
innerAccessor.reset(tempInfo.getBuffer(), tempInfo.getStartOffset(), tempInfo.getLength());
}
};
}
@Override
public ITupleAccessor getTupleAccessor(final RecordDescriptor recordDescriptor) {
return new AbstractTupleAccessor() {
FrameTupleAccessor innerAccessor = new FrameTupleAccessor(recordDescriptor);
@Override
IFrameTupleAccessor getInnerAccessor() {
return innerAccessor;
}
@Override
void resetInnerAccessor(int frameIndex) {
partitionArray[parsePartitionId(frameIndex)].getFrame(parseFrameIdInPartition(frameIndex), tempInfo);
innerAccessor.reset(tempInfo.getBuffer(), tempInfo.getStartOffset(), tempInfo.getLength());
}
@Override
int getFrameCount() {
return partitionArray.length;
}
};
}
@Override
public void flushPartition(int pid, IFrameWriter writer) throws HyracksDataException {
IFrameBufferManager partition = partitionArray[pid];
if (partition != null && getNumTuples(pid) > 0) {
for (int i = 0; i < partition.getNumFrames(); ++i) {
partition.getFrame(i, tempInfo);
tempInfo.getBuffer().position(tempInfo.getStartOffset());
tempInfo.getBuffer().limit(tempInfo.getStartOffset() + tempInfo.getLength());
writer.nextFrame(tempInfo.getBuffer());
}
}
}
public IFrameBufferManager getPartitionFrameBufferManager(int partition) {
if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) {
return null;
}
return partitionArray[partition];
}
}