blob: 1ed34f6c6158e5a8350b99daabaa05bac2b9a95d [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.hyracks.dataflow.std.buffermanager;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hyracks.api.comm.FixedSizeFrame;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
* This buffer manager will divide the buffers into given number of partitions.
* The cleared partition (spilled one in the caller side) can only get no more than one frame.
public class VPartitionTupleBufferManager implements IPartitionedTupleBufferManager {
public static final IPartitionedMemoryConstrain NO_CONSTRAIN = new IPartitionedMemoryConstrain() {
public int frameLimit(int partitionId) {
return Integer.MAX_VALUE;
private IDeallocatableFramePool framePool;
private IFrameBufferManager[] partitionArray;
private int[] numTuples;
private final FixedSizeFrame appendFrame;
private final FixedSizeFrameTupleAppender appender;
private BufferInfo tempInfo;
private final IPartitionedMemoryConstrain constrain;
public VPartitionTupleBufferManager(IHyracksFrameMgrContext ctx, IPartitionedMemoryConstrain constrain,
int partitions, int frameLimitInBytes) throws HyracksDataException {
this.constrain = constrain;
this.framePool = new DeallocatableFramePool(ctx, frameLimitInBytes);
this.partitionArray = new IFrameBufferManager[partitions];
this.numTuples = new int[partitions];
this.appendFrame = new FixedSizeFrame();
this.appender = new FixedSizeFrameTupleAppender();
this.tempInfo = new BufferInfo(null, -1, -1);
public void reset() throws HyracksDataException {
for (IFrameBufferManager part : partitionArray) {
if (part != null) {
for (int i = 0; i < part.getNumFrames(); i++) {
framePool.deAllocateBuffer(part.getFrame(i, tempInfo).getBuffer());
Arrays.fill(numTuples, 0);
public int getNumPartitions() {
return partitionArray.length;
public int getNumTuples(int partition) {
return numTuples[partition];
public int getPhysicalSize(int partitionId) {
int size = 0;
IFrameBufferManager partition = partitionArray[partitionId];
if (partition != null) {
for (int i = 0; i < partition.getNumFrames(); ++i) {
size += partition.getFrame(i, tempInfo).getLength();
return size;
public void clearPartition(int partitionId) throws HyracksDataException {
IFrameBufferManager partition = partitionArray[partitionId];
if (partition != null) {
for (int i = 0; i < partition.getNumFrames(); ++i) {
framePool.deAllocateBuffer(partition.getFrame(i, tempInfo).getBuffer());
numTuples[partitionId] = 0;
public boolean insertTuple(int partition, byte[] byteArray, int[] fieldEndOffsets, int start, int size,
TuplePointer pointer) throws HyracksDataException {
int actualSize = calculateActualSize(fieldEndOffsets, size);
int fid = getLastBufferOrCreateNewIfNotExist(partition, actualSize);
if (fid < 0) {
return false;
partitionArray[partition].getFrame(fid, tempInfo);
int tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
if (tid < 0) {
if (partitionArray[partition].getNumFrames() >= constrain.frameLimit(partition)) {
return false;
fid = createNewBuffer(partition, actualSize);
if (fid < 0) {
return false;
partitionArray[partition].getFrame(fid, tempInfo);
tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
pointer.reset(makeGroupFrameId(partition, fid), tid);
return true;
public boolean insertTuple(int partition, IFrameTupleAccessor tupleAccessor, int tupleId, TuplePointer pointer)
throws HyracksDataException {
return insertTuple(partition, tupleAccessor.getBuffer().array(), null,
tupleAccessor.getTupleStartOffset(tupleId), tupleAccessor.getTupleLength(tupleId), pointer);
private static int calculateActualSize(int[] fieldEndOffsets, int size) {
if (fieldEndOffsets != null) {
return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
return FrameHelper.calcRequiredSpace(0, size);
private int makeGroupFrameId(int partition, int fid) {
return fid * getNumPartitions() + partition;
private int parsePartitionId(int externalFrameId) {
return externalFrameId % getNumPartitions();
private int parseFrameIdInPartition(int externalFrameId) {
return externalFrameId / getNumPartitions();
private int createNewBuffer(int partition, int size) throws HyracksDataException {
ByteBuffer newBuffer = requestNewBufferFromPool(size);
if (newBuffer == null) {
return -1;
appender.reset(appendFrame, true);
return partitionArray[partition].insertFrame(newBuffer);
private ByteBuffer requestNewBufferFromPool(int recordSize) throws HyracksDataException {
int frameSize = FrameHelper.calcAlignedFrameSizeToStore(0, recordSize, framePool.getMinFrameSize());
return framePool.allocateFrame(frameSize);
private int appendTupleToBuffer(BufferInfo bufferInfo, int[] fieldEndOffsets, byte[] byteArray, int start, int size)
throws HyracksDataException {
assert (bufferInfo.getStartOffset() == 0) : "Haven't supported yet in FrameTupleAppender";
if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
appender.reset(appendFrame, false);
if (fieldEndOffsets == null) {
if (appender.append(byteArray, start, size)) {
return appender.getTupleCount() - 1;
} else {
if (appender.append(fieldEndOffsets, byteArray, start, size)) {
return appender.getTupleCount() - 1;
return -1;
private int getLastBufferOrCreateNewIfNotExist(int partition, int actualSize) throws HyracksDataException {
if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) {
partitionArray[partition] = new PartitionFrameBufferManager();
return createNewBuffer(partition, actualSize);
return partitionArray[partition].getNumFrames() - 1;
public void close() {
Arrays.fill(partitionArray, null);
private static class PartitionFrameBufferManager implements IFrameBufferManager {
ArrayList<ByteBuffer> buffers = new ArrayList<>();
public void reset() throws HyracksDataException {
public BufferInfo getFrame(int frameIndex, BufferInfo returnedInfo) {
returnedInfo.reset(buffers.get(frameIndex), 0, buffers.get(frameIndex).capacity());
return returnedInfo;
public int getNumFrames() {
return buffers.size();
public int insertFrame(ByteBuffer frame) throws HyracksDataException {
return buffers.size() - 1;
public void close() {
buffers = null;
public ITuplePointerAccessor getTuplePointerAccessor(final RecordDescriptor recordDescriptor) {
return new AbstractTuplePointerAccessor() {
FrameTupleAccessor innerAccessor = new FrameTupleAccessor(recordDescriptor);
IFrameTupleAccessor getInnerAccessor() {
return innerAccessor;
void resetInnerAccessor(TuplePointer tuplePointer) {
.getFrame(parseFrameIdInPartition(tuplePointer.getFrameIndex()), tempInfo);
innerAccessor.reset(tempInfo.getBuffer(), tempInfo.getStartOffset(), tempInfo.getLength());
public void flushPartition(int pid, IFrameWriter writer) throws HyracksDataException {
IFrameBufferManager partition = partitionArray[pid];
if (partition != null && getNumTuples(pid) > 0) {
for (int i = 0; i < partition.getNumFrames(); ++i) {
partition.getFrame(i, tempInfo);
tempInfo.getBuffer().limit(tempInfo.getStartOffset() + tempInfo.getLength());