| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.core.indexstore; |
| |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.indexstore.row.IndexRow; |
| import org.apache.carbondata.core.indexstore.row.UnsafeIndexRow; |
| import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; |
| import org.apache.carbondata.core.memory.CarbonUnsafe; |
| import org.apache.carbondata.core.memory.MemoryBlock; |
| import org.apache.carbondata.core.memory.MemoryType; |
| import org.apache.carbondata.core.memory.UnsafeMemoryManager; |
| import org.apache.carbondata.core.metadata.datatype.DataType; |
| import org.apache.carbondata.core.metadata.datatype.DataTypes; |
| |
| import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET; |
| import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe; |
| |
| /** |
| * Store the index row @{@link IndexRow} data to unsafe. |
| */ |
| public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { |
| |
| private static final long serialVersionUID = -5344592407101055335L; |
| |
| private transient MemoryBlock memoryBlock; |
| |
| private static int capacity = 8 * 1024; |
| |
| private int allocatedSize; |
| |
| private int runningLength; |
| |
| private int[] pointers; |
| |
| private int rowCount; |
| |
| private byte[] data; |
| |
| public UnsafeMemoryDMStore() { |
| this.allocatedSize = capacity; |
| this.memoryBlock = |
| UnsafeMemoryManager.allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, allocatedSize); |
| this.pointers = new int[100]; |
| } |
| |
| /** |
| * Check memory is sufficient or not, if not sufficient allocate more memory and copy old data to |
| * new one. |
| * |
| * @param rowSize |
| */ |
| private void ensureSize(int rowSize) { |
| if (runningLength + rowSize >= allocatedSize) { |
| increaseMemory(runningLength + rowSize); |
| } |
| if (this.pointers.length <= rowCount + 1) { |
| int[] newPointer = new int[pointers.length + 100]; |
| System.arraycopy(pointers, 0, newPointer, 0, pointers.length); |
| this.pointers = newPointer; |
| } |
| } |
| |
| private void increaseMemory(int requiredMemory) { |
| MemoryBlock newMemoryBlock = UnsafeMemoryManager |
| .allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, allocatedSize + requiredMemory); |
| getUnsafe().copyMemory(this.memoryBlock.getBaseObject(), this.memoryBlock.getBaseOffset(), |
| newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(), runningLength); |
| UnsafeMemoryManager.INSTANCE.freeMemory(taskId, this.memoryBlock); |
| allocatedSize = allocatedSize + requiredMemory; |
| this.memoryBlock = newMemoryBlock; |
| } |
| |
| /** |
| * Add the index row to unsafe. |
| * Below format is used to store data in memory block |
| * WRITE: |
| * <FD><FD><FD><VO><VO><VO><LO><VD><VD><VD> |
| * FD: Fixed Column data |
| * VO: Variable column data offset |
| * VD: Variable column data |
| * LO: Last Offset |
| * |
| * Read: |
| * FD: Read directly based of byte position added in CarbonRowSchema |
| * |
| * VD: Read based on below logic |
| * if not last variable column schema |
| * X = read actual variable column offset based on byte position added in CarbonRowSchema |
| * Y = read next variable column offset (next 4 bytes) |
| * get the length |
| * len = (X-Y) |
| * read data from offset X of size len |
| * |
| * if last variable column |
| * X = read actual variable column offset based on byte postion added in CarbonRowSchema |
| * Y = read last offset (next 4 bytes) |
| * get the length |
| * len = (X-Y) |
| * read data from offset X of size len |
| * |
| * @param indexRow |
| */ |
| public void addIndexRow(CarbonRowSchema[] schema, IndexRow indexRow) { |
| // First calculate the required memory to keep the row in unsafe |
| int rowSize = indexRow.getTotalSizeInBytes(); |
| // Check whether allocated memory is sufficient or not. |
| ensureSize(rowSize); |
| int pointer = runningLength; |
| int bytePosition = 0; |
| for (CarbonRowSchema carbonRowSchema : schema) { |
| if (carbonRowSchema.getSchemaType() == CarbonRowSchema.IndexSchemaType.STRUCT) { |
| CarbonRowSchema[] childSchemas = |
| ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas(); |
| for (int j = 0; j < childSchemas.length; j++) { |
| if (childSchemas[j].getBytePosition() > bytePosition) { |
| bytePosition = childSchemas[j].getBytePosition(); |
| } |
| } |
| } else { |
| if (carbonRowSchema.getBytePosition() > bytePosition) { |
| bytePosition = carbonRowSchema.getBytePosition(); |
| } |
| } |
| } |
| // byte position of Last offset |
| bytePosition += CarbonCommonConstants.INT_SIZE_IN_BYTE; |
| // start byte position of variable length data |
| int varColPosition = bytePosition + CarbonCommonConstants.INT_SIZE_IN_BYTE; |
| // current position refers to current byte position in memory block |
| int currentPosition; |
| for (int i = 0; i < schema.length; i++) { |
| switch (schema[i].getSchemaType()) { |
| case STRUCT: |
| CarbonRowSchema[] childSchemas = |
| ((CarbonRowSchema.StructCarbonRowSchema) schema[i]).getChildSchemas(); |
| IndexRow row = indexRow.getRow(i); |
| for (int j = 0; j < childSchemas.length; j++) { |
| currentPosition = addToUnsafe(childSchemas[j], row, j, pointer, varColPosition); |
| if (currentPosition > 0) { |
| varColPosition = currentPosition; |
| } |
| } |
| break; |
| default: |
| currentPosition = addToUnsafe(schema[i], indexRow, i, pointer, varColPosition); |
| if (currentPosition > 0) { |
| varColPosition = currentPosition; |
| } |
| break; |
| } |
| } |
| // writing the last offset |
| getUnsafe() |
| .putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + pointer + bytePosition, |
| varColPosition); |
| // after adding last offset increment the length by 4 bytes as last position |
| // written as INT |
| runningLength += CarbonCommonConstants.INT_SIZE_IN_BYTE; |
| pointers[rowCount++] = pointer; |
| } |
| |
| private int addToUnsafe(CarbonRowSchema schema, IndexRow row, int index, int startOffset, |
| int varPosition) { |
| switch (schema.getSchemaType()) { |
| case FIXED: |
| DataType dataType = schema.getDataType(); |
| if (dataType == DataTypes.BYTE) { |
| getUnsafe().putByte(memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), |
| row.getByte(index)); |
| runningLength += row.getSizeInBytes(index); |
| } else if (dataType == DataTypes.BOOLEAN) { |
| getUnsafe().putBoolean(memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), |
| row.getBoolean(index)); |
| runningLength += row.getSizeInBytes(index); |
| } else if (dataType == DataTypes.SHORT) { |
| getUnsafe().putShort(memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), |
| row.getShort(index)); |
| runningLength += row.getSizeInBytes(index); |
| } else if (dataType == DataTypes.INT) { |
| getUnsafe().putInt(memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), |
| row.getInt(index)); |
| runningLength += row.getSizeInBytes(index); |
| } else if (dataType == DataTypes.LONG) { |
| getUnsafe().putLong(memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), |
| row.getLong(index)); |
| runningLength += row.getSizeInBytes(index); |
| } else if (dataType == DataTypes.FLOAT) { |
| getUnsafe().putFloat(memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), |
| row.getFloat(index)); |
| runningLength += row.getSizeInBytes(index); |
| } else if (dataType == DataTypes.DOUBLE) { |
| getUnsafe().putDouble(memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), |
| row.getDouble(index)); |
| runningLength += row.getSizeInBytes(index); |
| } else if (dataType == DataTypes.BYTE_ARRAY) { |
| byte[] data = row.getByteArray(index); |
| getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), data.length); |
| runningLength += row.getSizeInBytes(index); |
| } else { |
| throw new UnsupportedOperationException( |
| "unsupported data type for unsafe storage: " + schema.getDataType()); |
| } |
| return 0; |
| case VARIABLE_SHORT: |
| case VARIABLE_INT: |
| byte[] data = row.getByteArray(index); |
| getUnsafe().putInt(memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), varPosition); |
| runningLength += 4; |
| if (data != null) { |
| getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset() + startOffset + varPosition, data.length); |
| runningLength += data.length; |
| varPosition += data.length; |
| } |
| return varPosition; |
| default: |
| throw new UnsupportedOperationException( |
| "unsupported data type for unsafe storage: " + schema.getDataType()); |
| } |
| } |
| |
| public IndexRow getIndexRow(CarbonRowSchema[] schema, int index) { |
| assert (index < rowCount); |
| return new UnsafeIndexRow(schema, memoryBlock, pointers[index]); |
| } |
| |
| public void finishWriting() { |
| if (runningLength < allocatedSize) { |
| MemoryBlock allocate = |
| UnsafeMemoryManager.allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, runningLength); |
| getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), |
| allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); |
| UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); |
| memoryBlock = allocate; |
| } |
| // Compact pointers. |
| if (rowCount < pointers.length) { |
| int[] newPointer = new int[rowCount]; |
| System.arraycopy(pointers, 0, newPointer, 0, rowCount); |
| this.pointers = newPointer; |
| } |
| } |
| |
| public void freeMemory() { |
| if (!isMemoryFreed) { |
| UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); |
| isMemoryFreed = true; |
| } |
| } |
| |
| public int getMemoryUsed() { |
| return runningLength; |
| } |
| |
| public int getRowCount() { |
| return rowCount; |
| } |
| |
| public void serializeMemoryBlock() { |
| this.data = new byte[runningLength]; |
| CarbonUnsafe.getUnsafe().copyMemory(memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset(), data, |
| CarbonUnsafe.BYTE_ARRAY_OFFSET, data.length); |
| freeMemory(); |
| isSerialized = true; |
| } |
| |
| public void copyToMemoryBlock() { |
| this.memoryBlock = |
| UnsafeMemoryManager.allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, this.data.length); |
| isMemoryFreed = false; |
| CarbonUnsafe.getUnsafe() |
| .copyMemory(data, CarbonUnsafe.BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), |
| memoryBlock.getBaseOffset(), this.data.length); |
| isSerialized = false; |
| this.data = null; |
| } |
| } |