blob: 9a1fac21e6136de0a612469bf8e670ab8f064607 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.page;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import sun.nio.ch.DirectBuffer;
// This extension uses unsafe memory to store page data, for fix length data type only (byte,
// short, integer, long, float, double)
public class UnsafeFixLengthColumnPage extends ColumnPage {
// memory allocated by Unsafe
private MemoryBlock memoryBlock;
// base address of memoryBlock
private Object baseAddress;
// base offset of memoryBlock
private long baseOffset;
private int eachRowSize;
// the length of the bytes added in the page
private int totalLength;
// size of the allocated memory, in bytes
private int capacity;
private final String taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
private static final int byteBits = DataTypes.BYTE.getSizeBits();
private static final int shortBits = DataTypes.SHORT.getSizeBits();
private static final int intBits = DataTypes.INT.getSizeBits();
private static final int longBits = DataTypes.LONG.getSizeBits();
private static final int floatBits = DataTypes.FLOAT.getSizeBits();
private static final int doubleBits = DataTypes.DOUBLE.getSizeBits();
UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
super(columnPageEncoderMeta, pageSize);
if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BOOLEAN ||
columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE ||
columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT ||
columnPageEncoderMeta.getStoreDataType() == DataTypes.INT ||
columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG ||
columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT ||
columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
int size = pageSize << columnPageEncoderMeta.getStoreDataType().getSizeBits();
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
capacity = size;
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
int size = pageSize * 3;
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
capacity = size;
} else if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType()) ||
columnPageEncoderMeta.getStoreDataType() == DataTypes.STRING) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
totalLength = 0;
}
UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize,
int eachRowSize) {
this(columnPageEncoderMeta, pageSize);
this.eachRowSize = eachRowSize;
totalLength = 0;
if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
capacity = pageSize * eachRowSize;
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, capacity);
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
}
}
private void checkDataFileSize() {
// 16 is a Watermark in order to stop from overflowing.
if (totalLength > (Integer.MAX_VALUE - 16)) {
// since we later store a column page in a byte array, so its maximum size is 2GB
throw new RuntimeException("Carbondata only support maximum 2GB size for one column page");
}
}
private void updatePageSize(int rowId) {
if (pageSize < rowId) {
// update the actual number of rows
pageSize = rowId + 1;
}
}
@Override
public void putByte(int rowId, byte value) {
ensureMemory(ByteUtil.SIZEOF_BYTE);
long offset = ((long)rowId) << byteBits;
CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, value);
totalLength += ByteUtil.SIZEOF_BYTE;
updatePageSize(rowId);
}
@Override
public void putShort(int rowId, short value) {
ensureMemory(shortBits);
long offset = ((long)rowId) << shortBits;
CarbonUnsafe.getUnsafe().putShort(baseAddress, baseOffset + offset, value);
totalLength += ByteUtil.SIZEOF_SHORT;
updatePageSize(rowId);
}
@Override
public void putShortInt(int rowId, int value) {
ensureMemory(ByteUtil.SIZEOF_SHORT_INT);
byte[] data = ByteUtil.to3Bytes(value);
long offset = rowId * 3L;
CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, data[0]);
CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 1, data[1]);
CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 2, data[2]);
totalLength += ByteUtil.SIZEOF_SHORT_INT;
updatePageSize(rowId);
}
@Override
public void putInt(int rowId, int value) {
ensureMemory(ByteUtil.SIZEOF_INT);
long offset = ((long)rowId) << intBits;
CarbonUnsafe.getUnsafe().putInt(baseAddress, baseOffset + offset, value);
totalLength += ByteUtil.SIZEOF_INT;
updatePageSize(rowId);
}
@Override
public void putLong(int rowId, long value) {
ensureMemory(ByteUtil.SIZEOF_LONG);
long offset = ((long)rowId) << longBits;
CarbonUnsafe.getUnsafe().putLong(baseAddress, baseOffset + offset, value);
totalLength += ByteUtil.SIZEOF_LONG;
updatePageSize(rowId);
}
@Override
public void putDouble(int rowId, double value) {
ensureMemory(ByteUtil.SIZEOF_DOUBLE);
long offset = ((long)rowId) << doubleBits;
CarbonUnsafe.getUnsafe().putDouble(baseAddress, baseOffset + offset, value);
totalLength += ByteUtil.SIZEOF_DOUBLE;
updatePageSize(rowId);
}
@Override
public void putFloat(int rowId, float value) {
ensureMemory(ByteUtil.SIZEOF_FLOAT);
long offset = ((long) rowId) << floatBits;
CarbonUnsafe.getUnsafe().putFloat(baseAddress, baseOffset + offset, value);
totalLength += ByteUtil.SIZEOF_FLOAT;
updatePageSize(rowId);
}
@Override
public void putBytes(int rowId, byte[] bytes) {
ensureMemory(eachRowSize);
// copy the data to memory
long offset = (long)rowId * eachRowSize;
CarbonUnsafe.getUnsafe()
.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
baseOffset + offset, bytes.length);
updatePageSize(rowId);
totalLength += eachRowSize;
}
@Override
public void putBytes(int rowId, byte[] bytes, int offset, int length) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putDecimal(int rowId, BigDecimal decimal) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte getByte(int rowId) {
long offset = ((long)rowId) << byteBits;
return CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset);
}
@Override
public short getShort(int rowId) {
long offset = ((long)rowId) << shortBits;
return CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset);
}
@Override
public int getShortInt(int rowId) {
long offset = rowId * 3L;
byte[] data = new byte[3];
data[0] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset);
data[1] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset + 1);
data[2] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset + 2);
return ByteUtil.valueOf3Bytes(data, 0);
}
@Override
public int getInt(int rowId) {
long offset = ((long)rowId) << intBits;
return CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset);
}
@Override
public long getLong(int rowId) {
long offset = ((long)rowId) << longBits;
return CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset);
}
@Override
public float getFloat(int rowId) {
long offset = ((long)rowId) << floatBits;
return CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset);
}
@Override
public double getDouble(int rowId) {
long offset = ((long)rowId) << doubleBits;
return CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset);
}
@Override
public BigDecimal getDecimal(int rowId) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte[] getBytes(int rowId) {
// creating a row
byte[] data = new byte[eachRowSize];
//copy the row from memory block based on offset
// offset position will be index * each column value length
CarbonUnsafe.getUnsafe().copyMemory(memoryBlock.getBaseObject(),
baseOffset + ((long)rowId * eachRowSize), data,
CarbonUnsafe.BYTE_ARRAY_OFFSET, eachRowSize);
return data;
}
@Override
public byte[] getDecimalPage() {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte[] getBytePage() {
byte[] data = new byte[getEndLoop()];
for (long i = 0; i < data.length; i++) {
long offset = i << byteBits;
data[(int)i] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public short[] getShortPage() {
short[] data = new short[getEndLoop()];
for (long i = 0; i < data.length; i++) {
long offset = i << shortBits;
data[(int)i] = CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public byte[] getShortIntPage() {
byte[] data = new byte[getEndLoop() * 3];
CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
data, CarbonUnsafe.BYTE_ARRAY_OFFSET, data.length);
return data;
}
@Override
public int[] getIntPage() {
int[] data = new int[getEndLoop()];
for (long i = 0; i < data.length; i++) {
long offset = i << intBits;
data[(int)i] = CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public long[] getLongPage() {
long[] data = new long[getEndLoop()];
for (long i = 0; i < data.length; i++) {
long offset = i << longBits;
data[(int)i] = CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public float[] getFloatPage() {
float[] data = new float[getEndLoop()];
for (long i = 0; i < data.length; i++) {
long offset = i << floatBits;
data[(int)i] = CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public double[] getDoublePage() {
double[] data = new double[getEndLoop()];
for (long i = 0; i < data.length; i++) {
long offset = i << doubleBits;
data[(int)i] = CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public byte[][] getByteArrayPage() {
byte[][] data = new byte[getEndLoop()][eachRowSize];
long offset = baseOffset;
for (int i = 0; i < data.length; i++) {
//copy the row from memory block based on offset
// offset position will be index * each column value length
CarbonUnsafe.getUnsafe().copyMemory(memoryBlock.getBaseObject(), offset, data[i],
CarbonUnsafe.BYTE_ARRAY_OFFSET, eachRowSize);
offset += eachRowSize;
}
return data;
}
@Override
public ByteBuffer getByteBuffer() {
int numRow = getEndLoop();
ByteBuffer out = ByteBuffer.allocateDirect(numRow * eachRowSize);
CarbonUnsafe.getUnsafe().copyMemory(
memoryBlock.getBaseOffset(), ((DirectBuffer)out).address(), numRow * eachRowSize);
return out;
}
@Override
public byte[] getLVFlattenedBytePage() {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) {
byte[] data = new byte[totalLength];
CarbonUnsafe.getUnsafe()
.copyMemory(baseAddress, baseOffset, data, CarbonUnsafe.BYTE_ARRAY_OFFSET, totalLength);
return data;
}
@Override
public byte[] getComplexParentFlattenedBytePage() {
throw new UnsupportedOperationException("internal error");
}
@Override
public void setBytePage(byte[] byteData) {
CarbonUnsafe.getUnsafe().copyMemory(byteData, CarbonUnsafe.BYTE_ARRAY_OFFSET,
baseAddress, baseOffset, byteData.length << byteBits);
capacity = byteData.length;
}
@Override
public void setShortPage(short[] shortData) {
CarbonUnsafe.getUnsafe().copyMemory(shortData, CarbonUnsafe.SHORT_ARRAY_OFFSET,
baseAddress, baseOffset, shortData.length << shortBits);
capacity = shortData.length;
}
@Override
public void setShortIntPage(byte[] shortIntData) {
CarbonUnsafe.getUnsafe().copyMemory(shortIntData, CarbonUnsafe.BYTE_ARRAY_OFFSET,
baseAddress, baseOffset, shortIntData.length);
capacity = shortIntData.length;
}
@Override
public void setIntPage(int[] intData) {
CarbonUnsafe.getUnsafe().copyMemory(intData, CarbonUnsafe.INT_ARRAY_OFFSET,
baseAddress, baseOffset, intData.length << intBits);
capacity = intData.length;
}
@Override
public void setLongPage(long[] longData) {
CarbonUnsafe.getUnsafe().copyMemory(longData, CarbonUnsafe.LONG_ARRAY_OFFSET,
baseAddress, baseOffset, longData.length << longBits);
capacity = longData.length;
}
@Override
public void setFloatPage(float[] floatData) {
CarbonUnsafe.getUnsafe().copyMemory(floatData, CarbonUnsafe.FLOAT_ARRAY_OFFSET,
baseAddress, baseOffset, floatData.length << floatBits);
capacity = floatData.length;
}
@Override
public void setDoublePage(double[] doubleData) {
CarbonUnsafe.getUnsafe().copyMemory(doubleData, CarbonUnsafe.DOUBLE_ARRAY_OFFSET,
baseAddress, baseOffset, doubleData.length << doubleBits);
capacity = doubleData.length;
}
@Override
public void setByteArrayPage(byte[][] byteArray) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
public void freeMemory() {
if (memoryBlock != null) {
UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
memoryBlock = null;
baseAddress = null;
baseOffset = 0;
}
}
@Override
public void convertValue(ColumnPageValueConverter codec) {
int endLoop = getEndLoop();
if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
for (long i = 0; i < endLoop; i++) {
long offset = i << byteBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset));
}
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
for (long i = 0; i < endLoop; i++) {
long offset = i << shortBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset));
}
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
for (long i = 0; i < endLoop; i++) {
long offset = i << intBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
}
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
for (long i = 0; i < endLoop; i++) {
long offset = i << longBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
}
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
for (long i = 0; i < endLoop; i++) {
long offset = i << floatBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset));
}
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
for (long i = 0; i < endLoop; i++) {
long offset = i << doubleBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset));
}
} else {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
}
private int getEndLoop() {
if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
return totalLength / ByteUtil.SIZEOF_BYTE;
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
return totalLength / ByteUtil.SIZEOF_SHORT;
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
return totalLength / ByteUtil.SIZEOF_SHORT_INT;
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
return totalLength / ByteUtil.SIZEOF_INT;
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
return totalLength / ByteUtil.SIZEOF_LONG;
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
return totalLength / DataTypes.FLOAT.getSizeInBytes();
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
return totalLength / DataTypes.DOUBLE.getSizeInBytes();
} else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
return totalLength / eachRowSize;
} else {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
}
@Override
public long getPageLengthInBytes() {
// For unsafe column page, we are always tracking the total length
// so return it directly instead of calculate it again (super class implementation)
return totalLength;
}
/**
* reallocate memory if capacity length than current size + request size
*/
protected void ensureMemory(int requestSize) {
checkDataFileSize();
if (totalLength + requestSize > capacity) {
int newSize = Math.max(2 * capacity, totalLength + requestSize);
MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
newBlock.getBaseObject(), newBlock.getBaseOffset(), totalLength);
UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
memoryBlock = newBlock;
baseAddress = newBlock.getBaseObject();
baseOffset = newBlock.getBaseOffset();
capacity = newSize;
}
}
public int getActualRowCount() {
return getEndLoop();
}
}