blob: 1381dc627b456c637af5c7d5b34f636056cf9114 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.page;
import java.io.IOException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
public abstract class VarLengthColumnPageBase extends ColumnPage {
static final int byteBits = DataTypes.BYTE.getSizeBits();
static final int shortBits = DataTypes.SHORT.getSizeBits();
static final int intBits = DataTypes.INT.getSizeBits();
static final int longBits = DataTypes.LONG.getSizeBits();
// default size for each row, grows as needed
static final int DEFAULT_ROW_SIZE = 8;
static final int DEFAULT_BINARY_SIZE = 512;
static final double FACTOR = 1.25;
final String taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
// memory allocated by Unsafe
MemoryBlock memoryBlock;
// base address of memoryBlock
Object baseAddress;
// the offset of row in the unsafe memory, its size is pageSize + 1
ColumnPage rowOffset;
// the length of bytes added in the page
int totalLength;
// base offset of memoryBlock
long baseOffset;
// size of the allocated memory, in bytes
int capacity;
VarLengthColumnPageBase(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
super(columnPageEncoderMeta, pageSize);
TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance(
columnPageEncoderMeta.getColumnSpec().getFieldName(), DataTypes.INT, ColumnType.MEASURE);
try {
rowOffset = ColumnPage.newPage(
new ColumnPageEncoderMeta(spec, DataTypes.INT, columnPageEncoderMeta.getCompressorName()),
pageSize + 1);
} catch (MemoryException e) {
throw new RuntimeException(e);
}
totalLength = 0;
}
@Override
public void setBytePage(byte[] byteData) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setShortPage(short[] shortData) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setShortIntPage(byte[] shortIntData) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setIntPage(int[] intData) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setLongPage(long[] longData) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setFloatPage(float[] floatData) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setDoublePage(double[] doubleData) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
/**
* Create a new column page for decimal page
*/
public static ColumnPage newDecimalColumnPage(ColumnPageEncoderMeta meta,
byte[] lvEncodedBytes, int actualDataLength) throws MemoryException {
TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
DecimalConverterFactory.DecimalConverter decimalConverter =
DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(),
columnSpec.getScale());
int size = decimalConverter.getSize();
if (size < 0) {
return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName());
} else {
// Here the size is always fixed.
return getDecimalColumnPage(meta, lvEncodedBytes, size, actualDataLength);
}
}
/**
* Create a new column page based on the LV (Length Value) encoded bytes
*/
static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
int lvLength, String compressorName) throws MemoryException {
return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY,
lvLength, compressorName);
}
/**
* Create a new column page based on the LV (Length Value) encoded bytes
*/
static ColumnPage newComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
byte[] lvEncodedBytes, int lvLength, String compressorName) throws MemoryException {
return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY,
lvLength, compressorName);
}
private static ColumnPage getDecimalColumnPage(ColumnPageEncoderMeta meta,
byte[] lvEncodedBytes, int size, int actualDataLength) throws MemoryException {
TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
String compressorName = meta.getCompressorName();
TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
.newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
ColumnPage rowOffset = ColumnPage.newPage(
new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
int offset;
int rowId = 0;
int counter = 0;
for (offset = 0; offset < actualDataLength; offset += size) {
rowOffset.putInt(counter, offset);
rowId++;
counter++;
}
rowOffset.putInt(counter, offset);
VarLengthColumnPageBase page;
if (isUnsafeEnabled(meta)) {
page = new UnsafeDecimalColumnPage(
new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
rowId);
} else {
page = new SafeDecimalColumnPage(
new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
rowId);
}
// set total length and rowOffset in page
page.totalLength = offset;
page.rowOffset.freeMemory();
page.rowOffset = rowOffset;
for (int i = 0; i < rowId; i++) {
page.putBytes(i, lvEncodedBytes, i * size, size);
}
return page;
}
private static ColumnPage getLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName)
throws MemoryException {
// extract length and data, set them to rowOffset and unsafe memory correspondingly
int rowId = 0;
TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
.newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
ColumnPage rowOffset = ColumnPage.newPage(
new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
int length;
int offset;
int lvEncodedOffset = 0;
int counter = 0;
// extract Length field in input and calculate total length
for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += length) {
length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset);
rowOffset.putInt(counter, offset);
lvEncodedOffset += lvLength + length;
rowId++;
counter++;
}
rowOffset.putInt(counter, offset);
return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType,
lvLength, rowId, rowOffset, offset, compressorName);
}
private static ColumnPage getComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName)
throws MemoryException {
// extract length and data, set them to rowOffset and unsafe memory correspondingly
int rowId = 0;
TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
.newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
ColumnPage rowOffset = ColumnPage.newPage(
new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
int length;
int offset;
int lvEncodedOffset = 0;
int counter = 0;
// extract Length field in input and calculate total length
for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += length) {
if (lvLength == CarbonCommonConstants.INT_SIZE_IN_BYTE) {
length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset);
rowOffset.putInt(counter, offset);
lvEncodedOffset += lvLength + length;
} else {
length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset);
rowOffset.putInt(counter, offset);
lvEncodedOffset += lvLength + length;
}
rowId++;
counter++;
}
rowOffset.putInt(counter, offset);
return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType,
lvLength, rowId, rowOffset, offset, compressorName);
}
private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSpec columnSpec,
byte[] lvEncodedBytes, DataType dataType, int lvLength, int rowId, ColumnPage rowOffset,
int offset, String compressorName) throws MemoryException {
int lvEncodedOffset;
int length;
int numRows = rowId;
VarLengthColumnPageBase page;
int inputDataLength = offset;
if (unsafe) {
page = new UnsafeDecimalColumnPage(
new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows,
inputDataLength);
} else {
page = new SafeDecimalColumnPage(
new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows);
}
// set total length and rowOffset in page
page.totalLength = offset;
page.rowOffset.freeMemory();
page.rowOffset = rowOffset;
// set data in page
lvEncodedOffset = 0;
for (int i = 0; i < numRows; i++) {
length = rowOffset.getInt(i + 1) - rowOffset.getInt(i);
page.putBytes(i, lvEncodedBytes, lvEncodedOffset + lvLength, length);
lvEncodedOffset += lvLength + length;
}
return page;
}
@Override
public void putByte(int rowId, byte value) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putShort(int rowId, short value) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putShortInt(int rowId, int value) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putInt(int rowId, int value) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putLong(int rowId, long value) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putDouble(int rowId, double value) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putFloat(int rowId, float value) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
abstract void putBytesAtRow(int rowId, byte[] bytes);
@Override
public void putBytes(int rowId, byte[] bytes) {
// rowId * 4 represents the length of L in LV
if (bytes.length > (Integer.MAX_VALUE - totalLength - rowId * 4)) {
// since we later store a column page in a byte array, so its maximum size is 2GB
throw new RuntimeException("Carbondata only support maximum 2GB size for one column page,"
+ " exceed this limit at rowId " + rowId);
}
if (rowId == 0) {
rowOffset.putInt(0, 0);
}
rowOffset.putInt(rowId + 1, rowOffset.getInt(rowId) + bytes.length);
putBytesAtRow(rowId, bytes);
totalLength += bytes.length;
}
@Override
public byte getByte(int rowId) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public short getShort(int rowId) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public int getShortInt(int rowId) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public int getInt(int rowId) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public long getLong(int rowId) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public float getFloat(int rowId) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public double getDouble(int rowId) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte[] getBytePage() {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public short[] getShortPage() {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte[] getShortIntPage() {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public int[] getIntPage() {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public long[] getLongPage() {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public float[] getFloatPage() {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public double[] getDoublePage() {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte[] getDecimalPage() {
// output LV encoded byte array
int offset = 0;
byte[] data = new byte[totalLength];
for (int rowId = 0; rowId < pageSize; rowId++) {
int length = rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId);
copyBytes(rowId, data, offset, length);
offset += length;
}
return data;
}
/**
* Copy `length` bytes from data at rowId to dest start from destOffset
*/
abstract void copyBytes(int rowId, byte[] dest, int destOffset, int length);
@Override
public byte[] getLVFlattenedBytePage() throws IOException {
// output LV encoded byte array
int offset = 0;
byte[] data = new byte[totalLength + ((rowOffset.getActualRowCount() - 1) * 4)];
for (int rowId = 0; rowId < rowOffset.getActualRowCount() - 1; rowId++) {
int length = rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId);
ByteUtil.setInt(data, offset, length);
copyBytes(rowId, data, offset + 4, length);
offset += 4 + length;
}
return data;
}
@Override
public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) throws IOException {
// output LV encoded byte array
int offset = 0;
int outputLength;
if (dataType == DataTypes.BYTE_ARRAY) {
outputLength = totalLength + ((rowOffset.getActualRowCount() - 1)
* CarbonCommonConstants.INT_SIZE_IN_BYTE);
} else {
outputLength = totalLength + ((rowOffset.getActualRowCount() - 1)
* CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
}
byte[] data = new byte[outputLength];
for (int rowId = 0; rowId < rowOffset.getActualRowCount() - 1; rowId++) {
if (dataType == DataTypes.BYTE_ARRAY) {
int length = rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId);
ByteUtil.setInt(data, offset, length);
copyBytes(rowId, data, offset + CarbonCommonConstants.INT_SIZE_IN_BYTE, length);
offset += CarbonCommonConstants.INT_SIZE_IN_BYTE + length;
} else {
short length = (short) (rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId));
ByteUtil.setShort(data, offset, length);
copyBytes(rowId, data, offset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE, length);
offset += CarbonCommonConstants.SHORT_SIZE_IN_BYTE + length;
}
}
return data;
}
@Override
public byte[] getComplexParentFlattenedBytePage() throws IOException {
// output LV encoded byte array
int offset = 0;
byte[] data = new byte[totalLength];
for (int rowId = 0; rowId < rowOffset.getActualRowCount() - 1; rowId++) {
int length = (rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId));
copyBytes(rowId, data, offset, length);
offset += length;
}
return data;
}
@Override
public void convertValue(ColumnPageValueConverter codec) {
throw new UnsupportedOperationException(
"invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
/**
* reallocate memory if capacity length than current size + request size
*/
protected void ensureMemory(int requestSize) throws MemoryException {
if (totalLength + requestSize > capacity) {
int newSize = Math.max(2 * capacity, totalLength + requestSize);
MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
memoryBlock = newBlock;
baseAddress = newBlock.getBaseObject();
baseOffset = newBlock.getBaseOffset();
capacity = newSize;
}
}
/**
* free memory as needed
*/
public void freeMemory() {
if (null != rowOffset) {
rowOffset.freeMemory();
rowOffset = null;
}
}
@Override
public long getPageLengthInBytes() throws IOException {
return totalLength;
}
}