blob: 5695b7021f1d64947f209c04fd6ee510da5febed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.page;
import java.io.IOException;
import java.math.BigDecimal;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
// This extension uses unsafe memory to store page data, for fix length data type only (byte,
// short, integer, long, float, double)
public class UnsafeFixLengthColumnPage extends ColumnPage {
// memory allocated by Unsafe
private MemoryBlock memoryBlock;
// base address of memoryBlock
private Object baseAddress;
// base offset of memoryBlock
private long baseOffset;
private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
private static final int byteBits = DataTypes.BYTE.getSizeBits();
private static final int shortBits = DataTypes.SHORT.getSizeBits();
private static final int intBits = DataTypes.INT.getSizeBits();
private static final int longBits = DataTypes.LONG.getSizeBits();
private static final int floatBits = DataTypes.FLOAT.getSizeBits();
private static final int doubleBits = DataTypes.DOUBLE.getSizeBits();
UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
throws MemoryException {
super(columnSpec, dataType, pageSize);
if (dataType == DataTypes.BOOLEAN ||
dataType == DataTypes.BYTE ||
dataType == DataTypes.SHORT ||
dataType == DataTypes.INT ||
dataType == DataTypes.LONG ||
dataType == DataTypes.FLOAT ||
dataType == DataTypes.DOUBLE) {
int size = pageSize << dataType.getSizeBits();
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
} else if (dataType == DataTypes.SHORT_INT) {
int size = pageSize * 3;
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
} else if (dataType == DataTypes.DECIMAL || dataType == DataTypes.STRING) {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
}
@Override
public void putByte(int rowId, byte value) {
long offset = rowId << byteBits;
CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, value);
}
@Override
public void putShort(int rowId, short value) {
long offset = rowId << shortBits;
CarbonUnsafe.getUnsafe().putShort(baseAddress, baseOffset + offset, value);
}
@Override
public void putShortInt(int rowId, int value) {
byte[] data = ByteUtil.to3Bytes(value);
long offset = rowId * 3L;
CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, data[0]);
CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 1, data[1]);
CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 2, data[2]);
}
@Override
public void putInt(int rowId, int value) {
long offset = rowId << intBits;
CarbonUnsafe.getUnsafe().putInt(baseAddress, baseOffset + offset, value);
}
@Override
public void putLong(int rowId, long value) {
long offset = rowId << longBits;
CarbonUnsafe.getUnsafe().putLong(baseAddress, baseOffset + offset, value);
}
@Override
public void putDouble(int rowId, double value) {
long offset = rowId << doubleBits;
CarbonUnsafe.getUnsafe().putDouble(baseAddress, baseOffset + offset, value);
}
@Override
public void putBytes(int rowId, byte[] bytes) {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
@Override
public void putBytes(int rowId, byte[] bytes, int offset, int length) {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
@Override public void putDecimal(int rowId, BigDecimal decimal) {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
@Override
public byte getByte(int rowId) {
long offset = rowId << byteBits;
return CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset);
}
@Override
public short getShort(int rowId) {
long offset = rowId << shortBits;
return CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset);
}
@Override
public int getShortInt(int rowId) {
long offset = rowId * 3L;
byte[] data = new byte[3];
data[0] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset);
data[1] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset + 1);
data[2] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset + 2);
return ByteUtil.valueOf3Bytes(data, 0);
}
@Override
public int getInt(int rowId) {
long offset = rowId << intBits;
return CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset);
}
@Override
public long getLong(int rowId) {
long offset = rowId << longBits;
return CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset);
}
@Override
public float getFloat(int rowId) {
long offset = rowId << floatBits;
return CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset);
}
@Override
public double getDouble(int rowId) {
long offset = rowId << doubleBits;
return CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset);
}
@Override
public BigDecimal getDecimal(int rowId) {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
@Override
public byte[] getBytes(int rowId) {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
@Override public byte[] getDecimalPage() {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
@Override
public byte[] getBytePage() {
byte[] data = new byte[getPageSize()];
for (int i = 0; i < data.length; i++) {
long offset = i << byteBits;
data[i] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public short[] getShortPage() {
short[] data = new short[getPageSize()];
for (int i = 0; i < data.length; i++) {
long offset = i << shortBits;
data[i] = CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public byte[] getShortIntPage() {
byte[] data = new byte[pageSize * 3];
CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
data, CarbonUnsafe.BYTE_ARRAY_OFFSET, data.length);
return data;
}
@Override
public int[] getIntPage() {
int[] data = new int[getPageSize()];
for (int i = 0; i < data.length; i++) {
long offset = i << intBits;
data[i] = CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public long[] getLongPage() {
long[] data = new long[getPageSize()];
for (int i = 0; i < data.length; i++) {
long offset = i << longBits;
data[i] = CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public float[] getFloatPage() {
float[] data = new float[getPageSize()];
for (int i = 0; i < data.length; i++) {
long offset = i << floatBits;
data[i] = CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public double[] getDoublePage() {
double[] data = new double[getPageSize()];
for (int i = 0; i < data.length; i++) {
long offset = i << doubleBits;
data[i] = CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset);
}
return data;
}
@Override
public byte[][] getByteArrayPage() {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
@Override
public byte[] getLVFlattenedBytePage() {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
@Override
public void setBytePage(byte[] byteData) {
CarbonUnsafe.getUnsafe().copyMemory(byteData, CarbonUnsafe.BYTE_ARRAY_OFFSET,
baseAddress, baseOffset, byteData.length << byteBits);
}
@Override
public void setShortPage(short[] shortData) {
CarbonUnsafe.getUnsafe().copyMemory(shortData, CarbonUnsafe.SHORT_ARRAY_OFFSET,
baseAddress, baseOffset, shortData.length << shortBits);
}
@Override
public void setShortIntPage(byte[] shortIntData) {
CarbonUnsafe.getUnsafe().copyMemory(shortIntData, CarbonUnsafe.BYTE_ARRAY_OFFSET,
baseAddress, baseOffset, shortIntData.length);
}
@Override
public void setIntPage(int[] intData) {
CarbonUnsafe.getUnsafe().copyMemory(intData, CarbonUnsafe.INT_ARRAY_OFFSET,
baseAddress, baseOffset, intData.length << intBits);
}
@Override
public void setLongPage(long[] longData) {
CarbonUnsafe.getUnsafe().copyMemory(longData, CarbonUnsafe.LONG_ARRAY_OFFSET,
baseAddress, baseOffset, longData.length << longBits);
}
@Override
public void setFloatPage(float[] floatData) {
CarbonUnsafe.getUnsafe().copyMemory(floatData, CarbonUnsafe.FLOAT_ARRAY_OFFSET,
baseAddress, baseOffset, floatData.length << floatBits);
}
@Override
public void setDoublePage(double[] doubleData) {
CarbonUnsafe.getUnsafe().copyMemory(doubleData, CarbonUnsafe.DOUBLE_ARRAY_OFFSET,
baseAddress, baseOffset, doubleData.length << doubleBits);
}
@Override
public void setByteArrayPage(byte[][] byteArray) {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
public void freeMemory() {
if (memoryBlock != null) {
UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
memoryBlock = null;
baseAddress = null;
baseOffset = 0;
}
}
@Override
public void convertValue(ColumnPageValueConverter codec) {
int pageSize = getPageSize();
if (dataType == DataTypes.BYTE) {
for (int i = 0; i < pageSize; i++) {
long offset = i << byteBits;
codec.encode(i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset));
}
} else if (dataType == DataTypes.SHORT) {
for (int i = 0; i < pageSize; i++) {
long offset = i << shortBits;
codec.encode(i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset));
}
} else if (dataType == DataTypes.INT) {
for (int i = 0; i < pageSize; i++) {
long offset = i << intBits;
codec.encode(i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
}
} else if (dataType == DataTypes.LONG) {
for (int i = 0; i < pageSize; i++) {
long offset = i << longBits;
codec.encode(i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
}
} else if (dataType == DataTypes.FLOAT) {
for (int i = 0; i < pageSize; i++) {
long offset = i << floatBits;
codec.encode(i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset));
}
} else if (dataType == DataTypes.DOUBLE) {
for (int i = 0; i < pageSize; i++) {
long offset = i << doubleBits;
codec.encode(i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset));
}
} else {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
}
@Override public byte[] compress(Compressor compressor) throws MemoryException, IOException {
if (UnsafeMemoryManager.isOffHeap()) {
// use raw compression and copy to byte[]
int inputSize = pageSize * dataType.getSizeInBytes();
int compressedMaxSize = compressor.maxCompressedLength(inputSize);
MemoryBlock compressed =
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize);
long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset());
assert outSize < Integer.MAX_VALUE;
byte[] output = new byte[(int) outSize];
CarbonUnsafe.getUnsafe()
.copyMemory(compressed.getBaseObject(), compressed.getBaseOffset(), output,
CarbonUnsafe.BYTE_ARRAY_OFFSET, outSize);
UnsafeMemoryManager.INSTANCE.freeMemory(taskId, compressed);
return output;
} else {
return super.compress(compressor);
}
}
}