blob: e83985f2d7f844e1626140d1abba7c5662fce86b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.indexstore.blockletindex;
import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.BlockletDataMapUtil;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
/**
* Datamap implementation for block.
*/
public class BlockDataMap extends CoarseGrainDataMap
implements BlockletDataMapRowIndexes, Serializable {
private static final Logger LOGGER =
LogServiceFactory.getLogService(BlockDataMap.class.getName());
protected static final long serialVersionUID = -2170289352240810993L;
/**
* for CACHE_LEVEL=BLOCK and legacy store default blocklet id will be -1
*/
private static final short BLOCK_DEFAULT_BLOCKLET_ID = -1;
/**
* store which will hold all the block or blocklet entries in one task
*/
protected AbstractMemoryDMStore memoryDMStore;
/**
* task summary holder store
*/
protected AbstractMemoryDMStore taskSummaryDMStore;
/**
* index of segmentProperties in the segmentProperties holder
*/
protected transient SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper
segmentPropertiesWrapper;
/**
* flag to check for store from 1.1 or any prior version
*/
protected boolean isLegacyStore;
/**
* flag to be used for forming the complete file path from file name. It will be true in case of
* partition table and non transactional table
*/
protected boolean isFilePathStored;
@Override
public void init(DataMapModel dataMapModel)
throws IOException, MemoryException {
long startTime = System.currentTimeMillis();
assert (dataMapModel instanceof BlockletDataMapModel);
BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
DataFileFooterConverter fileFooterConverter =
new DataFileFooterConverter(dataMapModel.getConfiguration());
List<DataFileFooter> indexInfo = null;
if (blockletDataMapInfo.getIndexInfos() == null || blockletDataMapInfo.getIndexInfos()
.isEmpty()) {
indexInfo = fileFooterConverter
.getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData(),
blockletDataMapInfo.getCarbonTable().isTransactionalTable());
} else {
// when index info is already read and converted to data file footer object
indexInfo = blockletDataMapInfo.getIndexInfos();
}
Path path = new Path(blockletDataMapInfo.getFilePath());
// store file path only in case of partition table, non transactional table and flat folder
// structure
byte[] filePath;
boolean isPartitionTable = blockletDataMapInfo.getCarbonTable().isHivePartitionTable();
if (isPartitionTable || !blockletDataMapInfo.getCarbonTable().isTransactionalTable() ||
blockletDataMapInfo.getCarbonTable().isSupportFlatFolder() ||
// if the segment data is written in tablepath then no need to store whole path of file.
!blockletDataMapInfo.getFilePath().startsWith(
blockletDataMapInfo.getCarbonTable().getTablePath())) {
filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
isFilePathStored = true;
} else {
filePath = new byte[0];
}
byte[] fileName = path.getName().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
byte[] segmentId =
blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
if (!indexInfo.isEmpty()) {
DataFileFooter fileFooter = indexInfo.get(0);
// store for 1.1 or any prior version will not have any blocklet information in file footer
isLegacyStore = fileFooter.getBlockletList() == null;
// init segment properties and create schema
SegmentProperties segmentProperties = initSegmentProperties(blockletDataMapInfo, fileFooter);
createMemorySchema(blockletDataMapInfo);
createSummaryDMStore(blockletDataMapInfo);
CarbonRowSchema[] taskSummarySchema = getTaskSummarySchema();
// check for legacy store and load the metadata
DataMapRowImpl summaryRow =
loadMetadata(taskSummarySchema, segmentProperties, blockletDataMapInfo, indexInfo);
finishWriting(taskSummarySchema, filePath, fileName, segmentId, summaryRow);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is "
+ (System.currentTimeMillis() - startTime));
}
}
private void finishWriting(CarbonRowSchema[] taskSummarySchema, byte[] filePath, byte[] fileName,
byte[] segmentId, DataMapRowImpl summaryRow) throws MemoryException {
if (memoryDMStore != null) {
memoryDMStore.finishWriting();
}
if (null != taskSummaryDMStore) {
addTaskSummaryRowToUnsafeMemoryStore(taskSummarySchema, summaryRow, filePath, fileName,
segmentId);
taskSummaryDMStore.finishWriting();
}
}
/**
* Method to check the cache level and load metadata based on that information
*
* @param segmentProperties
* @param blockletDataMapInfo
* @param indexInfo
* @throws IOException
* @throws MemoryException
*/
protected DataMapRowImpl loadMetadata(CarbonRowSchema[] taskSummarySchema,
SegmentProperties segmentProperties, BlockletDataMapModel blockletDataMapInfo,
List<DataFileFooter> indexInfo) throws IOException, MemoryException {
if (isLegacyStore) {
return loadBlockInfoForOldStore(taskSummarySchema, segmentProperties, blockletDataMapInfo,
indexInfo);
} else {
return loadBlockMetaInfo(taskSummarySchema, segmentProperties, blockletDataMapInfo,
indexInfo);
}
}
/**
* initialise segment properties
*
* @param fileFooter
* @throws IOException
*/
private SegmentProperties initSegmentProperties(BlockletDataMapModel blockletDataMapInfo,
DataFileFooter fileFooter) throws IOException {
List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
int[] columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
segmentPropertiesWrapper = SegmentPropertiesAndSchemaHolder.getInstance()
.addSegmentProperties(blockletDataMapInfo.getCarbonTable(),
columnInTable, columnCardinality, blockletDataMapInfo.getSegmentId());
return segmentPropertiesWrapper.getSegmentProperties();
}
/**
* This is old store scenario, here blocklet information is not available in index
* file so load only block info. Old store refers to store in 1.1 or prior to 1.1 version
*
* @param blockletDataMapInfo
* @param indexInfo
* @throws IOException
* @throws MemoryException
*/
protected DataMapRowImpl loadBlockInfoForOldStore(CarbonRowSchema[] taskSummarySchema,
SegmentProperties segmentProperties, BlockletDataMapModel blockletDataMapInfo,
List<DataFileFooter> indexInfo) throws IOException, MemoryException {
DataMapRowImpl summaryRow = null;
CarbonRowSchema[] schema = getFileFooterEntrySchema();
boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
FilterUtil.setMinMaxFlagForLegacyStore(minMaxFlag, segmentProperties);
long totalRowCount = 0;
for (DataFileFooter fileFooter : indexInfo) {
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
BlockMetaInfo blockMetaInfo =
blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath());
// Here it loads info about all blocklets of index
// Only add if the file exists physically. There are scenarios which index file exists inside
// merge index but related carbondata files are deleted. In that case we first check whether
// the file exists physically or not
if (null != blockMetaInfo) {
BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, fileFooter, segmentProperties,
getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow,
blockMetaInfo, minMaxIndex.getMinValues(), minMaxIndex.getMaxValues(), minMaxFlag);
totalRowCount += fileFooter.getNumberOfRows();
}
}
List<Short> blockletCountList = new ArrayList<>();
blockletCountList.add((short) 0);
byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountList);
// set the total row count
summaryRow.setLong(totalRowCount, TASK_ROW_COUNT);
summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1);
setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties, minMaxFlag);
return summaryRow;
}
protected void setMinMaxFlagForTaskSummary(DataMapRow summaryRow,
CarbonRowSchema[] taskSummarySchema, SegmentProperties segmentProperties,
boolean[] minMaxFlag) {
// add min max flag for all the dimension columns
boolean[] minMaxFlagValuesForColumnsToBeCached = BlockletDataMapUtil
.getMinMaxFlagValuesForColumnsToBeCached(segmentProperties, getMinMaxCacheColumns(),
minMaxFlag);
addMinMaxFlagValues(summaryRow, taskSummarySchema[TASK_MIN_MAX_FLAG],
minMaxFlagValuesForColumnsToBeCached, TASK_MIN_MAX_FLAG);
}
/**
* Method to load block metadata information
*
* @param blockletDataMapInfo
* @param indexInfo
* @throws IOException
* @throws MemoryException
*/
private DataMapRowImpl loadBlockMetaInfo(CarbonRowSchema[] taskSummarySchema,
SegmentProperties segmentProperties, BlockletDataMapModel blockletDataMapInfo,
List<DataFileFooter> indexInfo) throws IOException, MemoryException {
String tempFilePath = null;
DataFileFooter previousDataFileFooter = null;
int footerCounter = 0;
byte[][] blockMinValues = null;
byte[][] blockMaxValues = null;
DataMapRowImpl summaryRow = null;
List<Short> blockletCountInEachBlock = new ArrayList<>(indexInfo.size());
short totalBlockletsInOneBlock = 0;
boolean isLastFileFooterEntryNeedToBeAdded = false;
CarbonRowSchema[] schema = getFileFooterEntrySchema();
// flag for each block entry
boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
// min max flag for task summary
boolean[] taskSummaryMinMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(taskSummaryMinMaxFlag, true);
long totalRowCount = 0;
for (DataFileFooter fileFooter : indexInfo) {
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
BlockMetaInfo blockMetaInfo =
blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath());
footerCounter++;
if (blockMetaInfo != null) {
// this variable will be used for adding the DataMapRow entry every time a unique block
// path is encountered
if (null == tempFilePath) {
tempFilePath = blockInfo.getFilePath();
// 1st time assign the min and max values from the current file footer
blockMinValues = fileFooter.getBlockletIndex().getMinMaxIndex().getMinValues();
blockMaxValues = fileFooter.getBlockletIndex().getMinMaxIndex().getMaxValues();
updateMinMaxFlag(fileFooter, minMaxFlag);
updateMinMaxFlag(fileFooter, taskSummaryMinMaxFlag);
previousDataFileFooter = fileFooter;
totalBlockletsInOneBlock++;
} else if (blockInfo.getFilePath().equals(tempFilePath)) {
// After iterating over all the blocklets that belong to one block we need to compute the
// min and max at block level. So compare min and max values and update if required
BlockletMinMaxIndex currentFooterMinMaxIndex =
fileFooter.getBlockletIndex().getMinMaxIndex();
blockMinValues =
compareAndUpdateMinMax(currentFooterMinMaxIndex.getMinValues(), blockMinValues, true);
blockMaxValues =
compareAndUpdateMinMax(currentFooterMinMaxIndex.getMaxValues(), blockMaxValues,
false);
updateMinMaxFlag(fileFooter, minMaxFlag);
updateMinMaxFlag(fileFooter, taskSummaryMinMaxFlag);
totalBlockletsInOneBlock++;
}
// as one task contains entries for all the blocklets we need iterate and load only the
// with unique file path because each unique path will correspond to one
// block in the task. OR condition is to handle the loading of last file footer
if (!blockInfo.getFilePath().equals(tempFilePath) || footerCounter == indexInfo.size()) {
TableBlockInfo previousBlockInfo =
previousDataFileFooter.getBlockInfo().getTableBlockInfo();
summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, previousDataFileFooter,
segmentProperties, getMinMaxCacheColumns(), previousBlockInfo.getFilePath(),
summaryRow,
blockletDataMapInfo.getBlockMetaInfoMap().get(previousBlockInfo.getFilePath()),
blockMinValues, blockMaxValues, minMaxFlag);
totalRowCount += previousDataFileFooter.getNumberOfRows();
minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
// flag to check whether last file footer entry is different from previous entry.
// If yes then it need to be added at last
isLastFileFooterEntryNeedToBeAdded =
(footerCounter == indexInfo.size()) && (!blockInfo.getFilePath()
.equals(tempFilePath));
// assign local variables values using the current file footer
tempFilePath = blockInfo.getFilePath();
blockMinValues = fileFooter.getBlockletIndex().getMinMaxIndex().getMinValues();
blockMaxValues = fileFooter.getBlockletIndex().getMinMaxIndex().getMaxValues();
updateMinMaxFlag(fileFooter, minMaxFlag);
updateMinMaxFlag(fileFooter, taskSummaryMinMaxFlag);
previousDataFileFooter = fileFooter;
blockletCountInEachBlock.add(totalBlockletsInOneBlock);
// for next block count will start from 1 because a row is created whenever a new file
// path comes. Here already a new file path has come so the count should start from 1
totalBlockletsInOneBlock = 1;
}
}
}
// add the last file footer entry
if (isLastFileFooterEntryNeedToBeAdded) {
summaryRow =
loadToUnsafeBlock(schema, taskSummarySchema, previousDataFileFooter, segmentProperties,
getMinMaxCacheColumns(),
previousDataFileFooter.getBlockInfo().getTableBlockInfo().getFilePath(), summaryRow,
blockletDataMapInfo.getBlockMetaInfoMap()
.get(previousDataFileFooter.getBlockInfo().getTableBlockInfo().getFilePath()),
blockMinValues, blockMaxValues, minMaxFlag);
totalRowCount += previousDataFileFooter.getNumberOfRows();
blockletCountInEachBlock.add(totalBlockletsInOneBlock);
}
byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountInEachBlock);
// set the total row count
summaryRow.setLong(totalRowCount, TASK_ROW_COUNT);
// blocklet count index is the last index
summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1);
setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties,
taskSummaryMinMaxFlag);
return summaryRow;
}
protected void updateMinMaxFlag(DataFileFooter fileFooter, boolean[] minMaxFlag) {
BlockletDataMapUtil
.updateMinMaxFlag(fileFooter.getBlockletIndex().getMinMaxIndex(), minMaxFlag);
}
private byte[] convertRowCountFromShortToByteArray(List<Short> blockletCountInEachBlock) {
int bufferSize = blockletCountInEachBlock.size() * 2;
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
for (Short blockletCount : blockletCountInEachBlock) {
byteBuffer.putShort(blockletCount);
}
byteBuffer.rewind();
return byteBuffer.array();
}
protected void setLocations(String[] locations, DataMapRow row, int ordinal)
throws UnsupportedEncodingException {
// Add location info
String locationStr = StringUtils.join(locations, ',');
row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), ordinal);
}
/**
* Load information for the block.It is the case can happen only for old stores
* where blocklet information is not available in index file. So load only block information
* and read blocklet information in executor.
*/
protected DataMapRowImpl loadToUnsafeBlock(CarbonRowSchema[] schema,
CarbonRowSchema[] taskSummarySchema, DataFileFooter fileFooter,
SegmentProperties segmentProperties, List<CarbonColumn> minMaxCacheColumns, String filePath,
DataMapRowImpl summaryRow, BlockMetaInfo blockMetaInfo, byte[][] minValues,
byte[][] maxValues, boolean[] minMaxFlag) {
// Add one row to maintain task level min max for segment pruning
if (summaryRow == null) {
summaryRow = new DataMapRowImpl(taskSummarySchema);
}
DataMapRow row = new DataMapRowImpl(schema);
int ordinal = 0;
int taskMinMaxOrdinal = 1;
// get min max values for columns to be cached
byte[][] minValuesForColumnsToBeCached = BlockletDataMapUtil
.getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minValues);
byte[][] maxValuesForColumnsToBeCached = BlockletDataMapUtil
.getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, maxValues);
boolean[] minMaxFlagValuesForColumnsToBeCached = BlockletDataMapUtil
.getMinMaxFlagValuesForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minMaxFlag);
row.setRow(addMinMax(schema[ordinal], minValuesForColumnsToBeCached), ordinal);
// compute and set task level min values
addTaskMinMaxValues(summaryRow, taskSummarySchema, taskMinMaxOrdinal,
minValuesForColumnsToBeCached, TASK_MIN_VALUES_INDEX, true);
ordinal++;
taskMinMaxOrdinal++;
row.setRow(addMinMax(schema[ordinal], maxValuesForColumnsToBeCached), ordinal);
// compute and set task level max values
addTaskMinMaxValues(summaryRow, taskSummarySchema, taskMinMaxOrdinal,
maxValuesForColumnsToBeCached, TASK_MAX_VALUES_INDEX, false);
ordinal++;
// add total rows in one carbondata file
row.setInt((int) fileFooter.getNumberOfRows(), ordinal++);
// add file name
byte[] filePathBytes =
getFileNameFromPath(filePath).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
row.setByteArray(filePathBytes, ordinal++);
// add version number
row.setShort(fileFooter.getVersionId().number(), ordinal++);
// add schema updated time
row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
// add block offset
row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
try {
setLocations(blockMetaInfo.getLocationInfo(), row, ordinal++);
// store block size
row.setLong(blockMetaInfo.getSize(), ordinal++);
// add min max flag for all the dimension columns
addMinMaxFlagValues(row, schema[ordinal], minMaxFlagValuesForColumnsToBeCached, ordinal);
memoryDMStore.addIndexRow(schema, row);
} catch (Exception e) {
String message = "Load to unsafe failed for block: " + filePath;
LOGGER.error(message, e);
throw new RuntimeException(message, e);
}
return summaryRow;
}
protected void addMinMaxFlagValues(DataMapRow row, CarbonRowSchema carbonRowSchema,
boolean[] minMaxFlag, int ordinal) {
CarbonRowSchema[] minMaxFlagSchema =
((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
DataMapRow minMaxFlagRow = new DataMapRowImpl(minMaxFlagSchema);
int flagOrdinal = 0;
// min value adding
for (int i = 0; i < minMaxFlag.length; i++) {
minMaxFlagRow.setBoolean(minMaxFlag[i], flagOrdinal++);
}
row.setRow(minMaxFlagRow, ordinal);
}
protected String getFileNameFromPath(String filePath) {
return CarbonTablePath.getCarbonDataFileName(filePath);
}
protected String getFilePath() {
if (isFilePathStored) {
return getTableTaskInfo(SUMMARY_INDEX_PATH);
}
// create the segment directory path
String tablePath = segmentPropertiesWrapper.getTableIdentifier().getTablePath();
String segmentId = getTableTaskInfo(SUMMARY_SEGMENTID);
return CarbonTablePath.getSegmentPath(tablePath, segmentId);
}
protected String getFileNameWithFilePath(DataMapRow dataMapRow, String filePath) {
String fileName = filePath + CarbonCommonConstants.FILE_SEPARATOR + new String(
dataMapRow.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+ CarbonTablePath.getCarbonDataExtension();
return FileFactory.getUpdatedFilePath(fileName);
}
private void addTaskSummaryRowToUnsafeMemoryStore(CarbonRowSchema[] taskSummarySchema,
DataMapRow summaryRow, byte[] filePath, byte[] fileName, byte[] segmentId) {
// write the task summary info to unsafe memory store
if (null != summaryRow) {
summaryRow.setByteArray(fileName, SUMMARY_INDEX_FILE_NAME);
summaryRow.setByteArray(segmentId, SUMMARY_SEGMENTID);
if (filePath.length > 0) {
summaryRow.setByteArray(filePath, SUMMARY_INDEX_PATH);
}
try {
taskSummaryDMStore.addIndexRow(taskSummarySchema, summaryRow);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
protected DataMapRow addMinMax(CarbonRowSchema carbonRowSchema,
byte[][] minValues) {
CarbonRowSchema[] minSchemas =
((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
DataMapRow minRow = new DataMapRowImpl(minSchemas);
int minOrdinal = 0;
// min value adding
for (int i = 0; i < minValues.length; i++) {
minRow.setByteArray(minValues[i], minOrdinal++);
}
return minRow;
}
/**
* This method will compute min/max values at task level
*
* @param taskMinMaxRow
* @param carbonRowSchema
* @param taskMinMaxOrdinal
* @param minMaxValue
* @param ordinal
* @param isMinValueComparison
*/
protected void addTaskMinMaxValues(DataMapRow taskMinMaxRow, CarbonRowSchema[] carbonRowSchema,
int taskMinMaxOrdinal, byte[][] minMaxValue, int ordinal, boolean isMinValueComparison) {
DataMapRow row = taskMinMaxRow.getRow(ordinal);
byte[][] updatedMinMaxValues = null;
if (null == row) {
CarbonRowSchema[] minSchemas =
((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema[taskMinMaxOrdinal])
.getChildSchemas();
row = new DataMapRowImpl(minSchemas);
updatedMinMaxValues = minMaxValue;
} else {
byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal);
updatedMinMaxValues =
compareAndUpdateMinMax(minMaxValue, existingMinMaxValues, isMinValueComparison);
}
int minMaxOrdinal = 0;
// min/max value adding
for (int i = 0; i < updatedMinMaxValues.length; i++) {
row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++);
}
taskMinMaxRow.setRow(row, ordinal);
}
/**
* This method will do min/max comparison of values and update if required
*
* @param minMaxValueCompare1
* @param minMaxValueCompare2
* @param isMinValueComparison
*/
private byte[][] compareAndUpdateMinMax(byte[][] minMaxValueCompare1,
byte[][] minMaxValueCompare2, boolean isMinValueComparison) {
// Compare and update min max values
byte[][] updatedMinMaxValues = new byte[minMaxValueCompare1.length][];
System.arraycopy(minMaxValueCompare1, 0, updatedMinMaxValues, 0, minMaxValueCompare1.length);
for (int i = 0; i < minMaxValueCompare1.length; i++) {
int compare = ByteUtil.UnsafeComparer.INSTANCE
.compareTo(minMaxValueCompare2[i], minMaxValueCompare1[i]);
if (isMinValueComparison) {
if (compare < 0) {
updatedMinMaxValues[i] = minMaxValueCompare2[i];
}
} else if (compare > 0) {
updatedMinMaxValues[i] = minMaxValueCompare2[i];
}
}
return updatedMinMaxValues;
}
protected void createMemorySchema(BlockletDataMapModel blockletDataMapModel)
throws MemoryException {
memoryDMStore = getMemoryDMStore(blockletDataMapModel.isAddToUnsafe());
}
/**
* Creates the schema to store summary information or the information which can be stored only
* once per datamap. It stores datamap level max/min of each column and partition information of
* datamap
*
* @throws MemoryException
*/
protected void createSummaryDMStore(BlockletDataMapModel blockletDataMapModel)
throws MemoryException {
taskSummaryDMStore = getMemoryDMStore(blockletDataMapModel.isAddToUnsafe());
}
@Override
public boolean isScanRequired(FilterResolverIntf filterExp) {
FilterExecuter filterExecuter = FilterUtil
.getFilterExecuterTree(filterExp, getSegmentProperties(), null, getMinMaxCacheColumns());
DataMapRow unsafeRow = taskSummaryDMStore
.getDataMapRow(getTaskSummarySchema(), taskSummaryDMStore.getRowCount() - 1);
boolean isScanRequired = FilterExpressionProcessor
.isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX),
getMinMaxFlag(unsafeRow, TASK_MIN_MAX_FLAG));
if (isScanRequired) {
return true;
}
return false;
}
protected List<CarbonColumn> getMinMaxCacheColumns() {
return segmentPropertiesWrapper.getMinMaxCacheColumns();
}
/**
* for CACHE_LEVEL=BLOCK, each entry in memoryDMStore is for a block
* if data is not legacy store, we can get blocklet count from taskSummaryDMStore
*/
protected short getBlockletNumOfEntry(int index) {
if (isLegacyStore) {
// dummy value
return 0;
} else {
final byte[] bytes = getBlockletRowCountForEachBlock();
// if the segment data is written in tablepath
// then the reuslt of getBlockletRowCountForEachBlock will be empty.
if (bytes.length == 0) {
return 0;
} else {
return ByteBuffer.wrap(bytes).getShort(index * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
}
}
}
// get total block number in this datamap
public int getTotalBlocks() {
if (isLegacyStore) {
// dummy value
return 0;
} else {
return memoryDMStore.getRowCount();
}
}
// get total blocklet number in this datamap
protected int getTotalBlocklets() {
ByteBuffer byteBuffer = ByteBuffer.wrap(getBlockletRowCountForEachBlock());
int sum = 0;
while (byteBuffer.hasRemaining()) {
sum += byteBuffer.getShort();
}
return sum;
}
@Override
public long getRowCount(Segment segment, List<PartitionSpec> partitions) {
long totalRowCount =
taskSummaryDMStore.getDataMapRow(getTaskSummarySchema(), 0).getLong(TASK_ROW_COUNT);
if (totalRowCount == 0) {
Map<String, Long> blockletToRowCountMap = new HashMap<>();
getRowCountForEachBlock(segment, partitions, blockletToRowCountMap);
for (long blockletRowCount : blockletToRowCountMap.values()) {
totalRowCount += blockletRowCount;
}
} else {
if (taskSummaryDMStore.getRowCount() == 0) {
return 0L;
}
}
return totalRowCount;
}
public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
Map<String, Long> blockletToRowCountMap) {
if (memoryDMStore.getRowCount() == 0) {
return new HashMap<>();
}
// if it has partitioned datamap but there is no partitioned information stored, it means
// partitions are dropped so return empty list.
if (partitions != null) {
if (!validatePartitionInfo(partitions)) {
return new HashMap<>();
}
}
CarbonRowSchema[] schema = getFileFooterEntrySchema();
int numEntries = memoryDMStore.getRowCount();
for (int i = 0; i < numEntries; i++) {
DataMapRow dataMapRow = memoryDMStore.getDataMapRow(schema, i);
String fileName = new String(dataMapRow.getByteArray(FILE_PATH_INDEX),
CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + CarbonTablePath.getCarbonDataExtension();
int rowCount = dataMapRow.getInt(ROW_COUNT_INDEX);
// prepend segment number with the blocklet file path
String blockletMapKey = segment.getSegmentNo() + "," + fileName;
Long existingCount = blockletToRowCountMap.get(blockletMapKey);
if (null != existingCount) {
blockletToRowCountMap.put(blockletMapKey, (long) rowCount + existingCount);
} else {
blockletToRowCountMap.put(blockletMapKey, (long) rowCount);
}
}
return blockletToRowCountMap;
}
private List<Blocklet> prune(FilterResolverIntf filterExp) {
if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
List<Blocklet> blocklets = new ArrayList<>();
CarbonRowSchema[] schema = getFileFooterEntrySchema();
String filePath = getFilePath();
int numEntries = memoryDMStore.getRowCount();
int totalBlocklets = 0;
if (ExplainCollector.enabled()) {
totalBlocklets = getTotalBlocklets();
}
int hitBlocklets = 0;
if (filterExp == null) {
for (int i = 0; i < numEntries; i++) {
DataMapRow dataMapRow = memoryDMStore.getDataMapRow(schema, i);
blocklets.add(createBlocklet(dataMapRow, getFileNameWithFilePath(dataMapRow, filePath),
getBlockletId(dataMapRow), false));
}
hitBlocklets = totalBlocklets;
} else {
// Remove B-tree jump logic as start and end key prepared is not
// correct for old store scenarios
int entryIndex = 0;
FilterExecuter filterExecuter = FilterUtil
.getFilterExecuterTree(filterExp, getSegmentProperties(), null, getMinMaxCacheColumns());
// flag to be used for deciding whether use min/max in executor pruning for BlockletDataMap
boolean useMinMaxForPruning = useMinMaxForExecutorPruning(filterExp);
// min and max for executor pruning
while (entryIndex < numEntries) {
DataMapRow row = memoryDMStore.getDataMapRow(schema, entryIndex);
boolean[] minMaxFlag = getMinMaxFlag(row, BLOCK_MIN_MAX_FLAG);
String fileName = getFileNameWithFilePath(row, filePath);
short blockletId = getBlockletId(row);
boolean isValid =
addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(row, MAX_VALUES_INDEX),
getMinMaxValue(row, MIN_VALUES_INDEX), minMaxFlag, fileName, blockletId);
if (isValid) {
blocklets.add(createBlocklet(row, fileName, blockletId, useMinMaxForPruning));
if (ExplainCollector.enabled()) {
hitBlocklets += getBlockletNumOfEntry(entryIndex);
}
}
entryIndex++;
}
}
if (ExplainCollector.enabled()) {
if (isLegacyStore) {
ExplainCollector.setShowPruningInfo(false);
} else {
ExplainCollector.setShowPruningInfo(true);
ExplainCollector.addTotalBlocklets(totalBlocklets);
ExplainCollector.addTotalBlocks(getTotalBlocks());
ExplainCollector.addDefaultDataMapPruningHit(hitBlocklets);
}
}
return blocklets;
}
protected boolean useMinMaxForExecutorPruning(FilterResolverIntf filterResolverIntf) {
return false;
}
@Override
public List<Blocklet> prune(Expression expression, SegmentProperties properties,
List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException {
return prune(new DataMapFilter(properties, carbonTable, expression).getResolver(), properties,
partitions);
}
@Override
public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
List<PartitionSpec> partitions) {
if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
// if it has partitioned datamap but there is no partitioned information stored, it means
// partitions are dropped so return empty list.
if (partitions != null) {
if (!validatePartitionInfo(partitions)) {
return new ArrayList<>();
}
}
// Prune with filters if the partitions are existed in this datamap
// changed segmentProperties to this.segmentProperties to make sure the pruning with its own
// segmentProperties.
// Its a temporary fix. The Interface DataMap.prune(FilterResolverIntf filterExp,
// SegmentProperties segmentProperties, List<PartitionSpec> partitions) should be corrected
return prune(filterExp);
}
private boolean validatePartitionInfo(List<PartitionSpec> partitions) {
// First get the partitions which are stored inside datamap.
String[] fileDetails = getFileDetails();
// Check the exact match of partition information inside the stored partitions.
boolean found = false;
Path folderPath = new Path(fileDetails[0]);
for (PartitionSpec spec : partitions) {
if (folderPath.equals(spec.getLocation()) && isCorrectUUID(fileDetails, spec)) {
found = true;
break;
}
}
return found;
}
@Override
public void finish() {
}
private boolean isCorrectUUID(String[] fileDetails, PartitionSpec spec) {
boolean needToScan = false;
if (spec.getUuid() != null) {
String[] split = spec.getUuid().split("_");
if (split[0].equals(fileDetails[2]) && CarbonTablePath.DataFileUtil
.getTimeStampFromFileName(fileDetails[1]).equals(split[1])) {
needToScan = true;
}
} else {
needToScan = true;
}
return needToScan;
}
/**
* select the blocks based on column min and max value
*
* @param filterExecuter
* @param maxValue
* @param minValue
* @param minMaxFlag
* @param filePath
* @param blockletId
* @return
*/
private boolean addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter, byte[][] maxValue,
byte[][] minValue, boolean[] minMaxFlag, String filePath, int blockletId) {
BitSet bitSet = null;
if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
String uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1);
// this case will come in case of old store where index file does not contain the
// blocklet information
if (blockletId != -1) {
uniqueBlockPath = uniqueBlockPath + CarbonCommonConstants.FILE_SEPARATOR + blockletId;
}
bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
.isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, uniqueBlockPath, minMaxFlag);
} else {
bitSet = filterExecuter.isScanRequired(maxValue, minValue, minMaxFlag);
}
if (!bitSet.isEmpty()) {
return true;
} else {
return false;
}
}
public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
if (isLegacyStore) {
throw new UnsupportedOperationException("With legacy store only BlockletDataMap is allowed."
+ " In order to use other dataMaps upgrade to new store.");
}
int absoluteBlockletId = Integer.parseInt(blockletId);
return createBlockletFromRelativeBlockletId(absoluteBlockletId);
}
/**
* Method to get the relative blocklet ID. Absolute blocklet ID is the blocklet Id as per
* task level but relative blocklet ID is id as per carbondata file/block level
*
* @param absoluteBlockletId
* @return
*/
private ExtendedBlocklet createBlockletFromRelativeBlockletId(int absoluteBlockletId) {
short relativeBlockletId = -1;
int rowIndex = 0;
// return 0 if absoluteBlockletId is 0
if (absoluteBlockletId == 0) {
relativeBlockletId = (short) absoluteBlockletId;
} else {
int diff = absoluteBlockletId;
ByteBuffer byteBuffer = ByteBuffer.wrap(getBlockletRowCountForEachBlock());
// Example: absoluteBlockletID = 17, blockletRowCountForEachBlock = {4,3,2,5,7}
// step1: diff = 17-4, diff = 13
// step2: diff = 13-3, diff = 10
// step3: diff = 10-2, diff = 8
// step4: diff = 8-5, diff = 3
// step5: diff = 3-7, diff = -4 (satisfies <= 0)
// step6: relativeBlockletId = -4+7, relativeBlockletId = 3 (4th index starting from 0)
while (byteBuffer.hasRemaining()) {
short blockletCount = byteBuffer.getShort();
diff = diff - blockletCount;
if (diff < 0) {
relativeBlockletId = (short) (diff + blockletCount);
break;
}
rowIndex++;
}
}
DataMapRow row =
memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), rowIndex);
String filePath = getFilePath();
return createBlocklet(row, getFileNameWithFilePath(row, filePath), relativeBlockletId,
false);
}
private byte[] getBlockletRowCountForEachBlock() {
// taskSummary DM store will have only one row
CarbonRowSchema[] taskSummarySchema = getTaskSummarySchema();
return taskSummaryDMStore
.getDataMapRow(taskSummarySchema, taskSummaryDMStore.getRowCount() - 1)
.getByteArray(taskSummarySchema.length - 1);
}
/**
* Get the index file name of the blocklet data map
*
* @return
*/
public String getTableTaskInfo(int index) {
DataMapRow unsafeRow = taskSummaryDMStore.getDataMapRow(getTaskSummarySchema(), 0);
try {
return new String(unsafeRow.getByteArray(index), CarbonCommonConstants.DEFAULT_CHARSET);
} catch (UnsupportedEncodingException e) {
// should never happen!
throw new IllegalArgumentException("UTF8 encoding is not supported", e);
}
}
private byte[][] getMinMaxValue(DataMapRow row, int index) {
DataMapRow minMaxRow = row.getRow(index);
byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
for (int i = 0; i < minMax.length; i++) {
minMax[i] = minMaxRow.getByteArray(i);
}
return minMax;
}
private boolean[] getMinMaxFlag(DataMapRow row, int index) {
DataMapRow minMaxFlagRow = row.getRow(index);
boolean[] minMaxFlag = new boolean[minMaxFlagRow.getColumnCount()];
for (int i = 0; i < minMaxFlag.length; i++) {
minMaxFlag[i] = minMaxFlagRow.getBoolean(i);
}
return minMaxFlag;
}
protected short getBlockletId(DataMapRow dataMapRow) {
return BLOCK_DEFAULT_BLOCKLET_ID;
}
protected ExtendedBlocklet createBlocklet(DataMapRow row, String fileName, short blockletId,
boolean useMinMaxForPruning) {
short versionNumber = row.getShort(VERSION_INDEX);
ExtendedBlocklet blocklet = new ExtendedBlocklet(fileName, blockletId + "", false,
ColumnarFormatVersion.valueOf(versionNumber));
blocklet.setDataMapRow(row);
blocklet.setColumnCardinality(getColumnCardinality());
blocklet.setLegacyStore(isLegacyStore);
blocklet.setUseMinMaxForPruning(useMinMaxForPruning);
return blocklet;
}
private String[] getFileDetails() {
try {
String[] fileDetails = new String[3];
DataMapRow unsafeRow = taskSummaryDMStore.getDataMapRow(getTaskSummarySchema(), 0);
fileDetails[0] = new String(unsafeRow.getByteArray(SUMMARY_INDEX_PATH),
CarbonCommonConstants.DEFAULT_CHARSET);
fileDetails[1] = new String(unsafeRow.getByteArray(SUMMARY_INDEX_FILE_NAME),
CarbonCommonConstants.DEFAULT_CHARSET);
fileDetails[2] = new String(unsafeRow.getByteArray(SUMMARY_SEGMENTID),
CarbonCommonConstants.DEFAULT_CHARSET);
return fileDetails;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void clear() {
if (memoryDMStore != null) {
memoryDMStore.freeMemory();
}
// clear task min/max unsafe memory
if (null != taskSummaryDMStore) {
taskSummaryDMStore.freeMemory();
}
}
public long getMemorySize() {
long memoryUsed = 0L;
if (memoryDMStore != null) {
memoryUsed += memoryDMStore.getMemoryUsed();
}
if (null != taskSummaryDMStore) {
memoryUsed += taskSummaryDMStore.getMemoryUsed();
}
return memoryUsed;
}
protected SegmentProperties getSegmentProperties() {
return segmentPropertiesWrapper.getSegmentProperties();
}
public int[] getColumnCardinality() {
return segmentPropertiesWrapper.getColumnCardinality();
}
public List<ColumnSchema> getColumnSchema() {
return segmentPropertiesWrapper.getColumnsInTable();
}
protected AbstractMemoryDMStore getMemoryDMStore(boolean addToUnsafe)
throws MemoryException {
AbstractMemoryDMStore memoryDMStore;
if (addToUnsafe) {
memoryDMStore = new UnsafeMemoryDMStore();
} else {
memoryDMStore = new SafeMemoryDMStore();
}
return memoryDMStore;
}
protected CarbonRowSchema[] getFileFooterEntrySchema() {
return segmentPropertiesWrapper.getBlockFileFooterEntrySchema();
}
protected CarbonRowSchema[] getTaskSummarySchema() {
try {
return segmentPropertiesWrapper.getTaskSummarySchemaForBlock(true, isFilePathStored);
} catch (MemoryException e) {
throw new RuntimeException(e);
}
}
/**
* This method will ocnvert safe to unsafe memory DM store
*
* @throws MemoryException
*/
public void convertToUnsafeDMStore() throws MemoryException {
if (memoryDMStore instanceof SafeMemoryDMStore) {
UnsafeMemoryDMStore unsafeMemoryDMStore = memoryDMStore.convertToUnsafeDMStore(
getFileFooterEntrySchema());
memoryDMStore.freeMemory();
memoryDMStore = unsafeMemoryDMStore;
}
if (taskSummaryDMStore instanceof SafeMemoryDMStore) {
UnsafeMemoryDMStore unsafeSummaryMemoryDMStore =
taskSummaryDMStore.convertToUnsafeDMStore(getTaskSummarySchema());
taskSummaryDMStore.freeMemory();
taskSummaryDMStore = unsafeSummaryMemoryDMStore;
}
}
public SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper getSegmentPropertiesWrapper() {
return segmentPropertiesWrapper;
}
@Override
public int getNumberOfEntries() {
if (memoryDMStore != null) {
if (memoryDMStore.getRowCount() == 0) {
// so that one datamap considered as one record
return 1;
} else {
return memoryDMStore.getRowCount();
}
} else {
// legacy store
return 1;
}
}
}