blob: 1d8f941395352ac195498911f8504e86f6db938c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.loading.sort.unsafe;
import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.IntPointerBuffer;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.DataTypeUtil;
/**
* It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
*/
public class UnsafeCarbonRowPage {
private boolean[] noDictionaryDimensionMapping;
private boolean[] noDictionarySortColumnMapping;
private int dimensionSize;
private int measureSize;
private DataType[] measureDataType;
private long[] nullSetWords;
private IntPointerBuffer buffer;
private int lastSize;
private long sizeToBeUsed;
private MemoryBlock dataBlock;
private boolean saveToDisk;
private MemoryManagerType managerType;
private long taskId;
public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
this.dimensionSize = dimensionSize;
this.measureSize = measureSize;
this.measureDataType = type;
this.saveToDisk = saveToDisk;
this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
this.taskId = taskId;
buffer = new IntPointerBuffer(this.taskId);
this.dataBlock = memoryBlock;
// TODO Only using 98% of space for safe side.May be we can have different logic.
sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
}
public int addRow(Object[] row) {
int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
buffer.set(lastSize);
lastSize = lastSize + size;
return size;
}
private int addRow(Object[] row, long address) {
if (row == null) {
throw new RuntimeException("Row is null ??");
}
int dimCount = 0;
int size = 0;
Object baseObject = dataBlock.getBaseObject();
for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
if (noDictionaryDimensionMapping[dimCount]) {
byte[] col = (byte[]) row[dimCount];
CarbonUnsafe.getUnsafe()
.putShort(baseObject, address + size, (short) col.length);
size += 2;
CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
address + size, col.length);
size += col.length;
} else {
int value = (int) row[dimCount];
CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
size += 4;
}
}
// write complex dimensions here.
for (; dimCount < dimensionSize; dimCount++) {
byte[] col = (byte[]) row[dimCount];
CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length);
size += 2;
CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
address + size, col.length);
size += col.length;
}
Arrays.fill(nullSetWords, 0);
int nullSetSize = nullSetWords.length * 8;
int nullWordLoc = size;
size += nullSetSize;
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
Object value = row[mesCount + dimensionSize];
if (null != value) {
DataType dataType = measureDataType[mesCount];
if (dataType == DataTypes.BOOLEAN) {
Boolean bval = (Boolean) value;
CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, bval);
size += 1;
} else if (dataType == DataTypes.SHORT) {
Short sval = (Short) value;
CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
size += 2;
} else if (dataType == DataTypes.INT) {
Integer ival = (Integer) value;
CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
size += 4;
} else if (dataType == DataTypes.LONG) {
Long val = (Long) value;
CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
size += 8;
} else if (dataType == DataTypes.DOUBLE) {
Double doubleVal = (Double) value;
CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
size += 8;
} else if (dataType == DataTypes.DECIMAL) {
BigDecimal decimalVal = (BigDecimal) value;
byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
CarbonUnsafe.getUnsafe()
.putShort(baseObject, address + size, (short) bigDecimalInBytes.length);
size += 2;
CarbonUnsafe.getUnsafe()
.copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
address + size, bigDecimalInBytes.length);
size += bigDecimalInBytes.length;
} else {
throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
}
set(nullSetWords, mesCount);
} else {
unset(nullSetWords, mesCount);
}
}
CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
address + nullWordLoc, nullSetSize);
return size;
}
public Object[] getRow(long address, Object[] rowToFill) {
int dimCount = 0;
int size = 0;
Object baseObject = dataBlock.getBaseObject();
for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
if (noDictionaryDimensionMapping[dimCount]) {
short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
byte[] col = new byte[aShort];
size += 2;
CarbonUnsafe.getUnsafe()
.copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
col.length);
size += col.length;
rowToFill[dimCount] = col;
} else {
int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
rowToFill[dimCount] = anInt;
}
}
// write complex dimensions here.
for (; dimCount < dimensionSize; dimCount++) {
short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
byte[] col = new byte[aShort];
size += 2;
CarbonUnsafe.getUnsafe()
.copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
size += col.length;
rowToFill[dimCount] = col;
}
int nullSetSize = nullSetWords.length * 8;
Arrays.fill(nullSetWords, 0);
CarbonUnsafe.getUnsafe()
.copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
nullSetSize);
size += nullSetSize;
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
if (isSet(nullSetWords, mesCount)) {
DataType dataType = measureDataType[mesCount];
if (dataType == DataTypes.BOOLEAN) {
Boolean bval = CarbonUnsafe.getUnsafe().getBoolean(baseObject, address + size);
size += 1;
rowToFill[dimensionSize + mesCount] = bval;
} else if (dataType == DataTypes.SHORT) {
Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
size += 2;
rowToFill[dimensionSize + mesCount] = sval;
} else if (dataType == DataTypes.INT) {
Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
rowToFill[dimensionSize + mesCount] = ival;
} else if (dataType == DataTypes.LONG) {
Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
size += 8;
rowToFill[dimensionSize + mesCount] = val;
} else if (dataType == DataTypes.DOUBLE) {
Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
size += 8;
rowToFill[dimensionSize + mesCount] = doubleVal;
} else if (dataType == DataTypes.DECIMAL) {
short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
byte[] bigDecimalInBytes = new byte[aShort];
size += 2;
CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
size += bigDecimalInBytes.length;
rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
} else {
throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
}
} else {
rowToFill[dimensionSize + mesCount] = null;
}
}
return rowToFill;
}
public void fillRow(long address, DataOutputStream stream) throws IOException {
int dimCount = 0;
int size = 0;
Object baseObject = dataBlock.getBaseObject();
for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
if (noDictionaryDimensionMapping[dimCount]) {
short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
byte[] col = new byte[aShort];
size += 2;
CarbonUnsafe.getUnsafe()
.copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
col.length);
size += col.length;
stream.writeShort(aShort);
stream.write(col);
} else {
int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
stream.writeInt(anInt);
}
}
// write complex dimensions here.
for (; dimCount < dimensionSize; dimCount++) {
short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
byte[] col = new byte[aShort];
size += 2;
CarbonUnsafe.getUnsafe()
.copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
size += col.length;
stream.writeShort(aShort);
stream.write(col);
}
int nullSetSize = nullSetWords.length * 8;
Arrays.fill(nullSetWords, 0);
CarbonUnsafe.getUnsafe()
.copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
nullSetSize);
size += nullSetSize;
for (int i = 0; i < nullSetWords.length; i++) {
stream.writeLong(nullSetWords[i]);
}
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
if (isSet(nullSetWords, mesCount)) {
DataType dataType = measureDataType[mesCount];
if (dataType == DataTypes.SHORT) {
short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
size += 2;
stream.writeShort(sval);
} else if (dataType == DataTypes.INT) {
int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
stream.writeInt(ival);
} else if (dataType == DataTypes.LONG) {
long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
size += 8;
stream.writeLong(val);
} else if (dataType == DataTypes.DOUBLE) {
double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
size += 8;
stream.writeDouble(doubleVal);
} else if (dataType == DataTypes.DECIMAL) {
short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
byte[] bigDecimalInBytes = new byte[aShort];
size += 2;
CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
size += bigDecimalInBytes.length;
stream.writeShort(aShort);
stream.write(bigDecimalInBytes);
} else {
throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
}
}
}
}
public void freeMemory() {
switch (managerType) {
case UNSAFE_MEMORY_MANAGER:
UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
break;
default:
UnsafeSortMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
buffer.freeMemory();
}
}
public boolean isSaveToDisk() {
return saveToDisk;
}
public IntPointerBuffer getBuffer() {
return buffer;
}
public int getUsedSize() {
return lastSize;
}
public boolean canAdd() {
return lastSize < sizeToBeUsed;
}
public MemoryBlock getDataBlock() {
return dataBlock;
}
public static void set(long[] words, int index) {
int wordOffset = (index >> 6);
words[wordOffset] |= (1L << index);
}
public static void unset(long[] words, int index) {
int wordOffset = (index >> 6);
words[wordOffset] &= ~(1L << index);
}
public static boolean isSet(long[] words, int index) {
int wordOffset = (index >> 6);
return ((words[wordOffset] & (1L << index)) != 0);
}
public boolean[] getNoDictionaryDimensionMapping() {
return noDictionaryDimensionMapping;
}
public boolean[] getNoDictionarySortColumnMapping() {
return noDictionarySortColumnMapping;
}
public void setNewDataBlock(MemoryBlock newMemoryBlock) {
this.dataBlock = newMemoryBlock;
this.managerType = MemoryManagerType.UNSAFE_SORT_MEMORY_MANAGER;
}
public enum MemoryManagerType {
UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER
}
}