blob: 06231773a74befd8a853780bd47da78798b96463 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.scan.executor.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.index.IndexFilter;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode;
import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.util.BlockletIndexUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
/**
* This class provides a skeletal implementation of the {@link QueryExecutor}
* interface to minimize the effort required to implement this interface. This
* will be used to prepare all the properties required for query execution
*/
public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
private static final Logger LOGGER =
LogServiceFactory.getLogService(AbstractQueryExecutor.class.getName());
/**
* holder for query properties which will be used to execute the query
*/
protected QueryExecutorProperties queryProperties;
// whether to clear/free unsafe memory or not
private boolean freeUnsafeMemory;
/**
* query result iterator which will execute the query
* and give the result
*/
protected CarbonIterator queryIterator;
public AbstractQueryExecutor(Configuration configuration) {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
queryProperties = new QueryExecutorProperties();
}
public void setExecutorService(ExecutorService executorService) {
// add executor service for query execution
queryProperties.executorService = executorService;
}
/**
* Below method will be used to fill the executor properties based on query
* model it will parse the query model and get the detail and fill it in
* query properties
*
* @param queryModel
*/
protected void initQuery(QueryModel queryModel) throws IOException {
LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
.getCarbonTableIdentifier().getTableName());
this.freeUnsafeMemory = queryModel.isFreeUnsafeMemory();
// Initializing statistics list to record the query statistics
// creating copy on write to handle concurrent scenario
queryProperties.queryStatisticsRecorder = queryModel.getStatisticsRecorder();
if (null == queryProperties.queryStatisticsRecorder) {
queryProperties.queryStatisticsRecorder =
CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
}
QueryStatistic queryStatistic = new QueryStatistic();
// sort the block info
// so block will be loaded in sorted order this will be required for
// query execution
Collections.sort(queryModel.getTableBlockInfos());
queryProperties.dataBlocks = getDataBlocks(queryModel);
queryStatistic
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
// as aggregation will be executed in following order
// 1.aggregate dimension expression
// 2. expression
// 3. query measure
// so calculating the index of the expression start index
// and measure column start index
queryProperties.filterMeasures = new HashSet<>();
queryProperties.complexFilterDimension = new HashSet<>();
if (queryModel.getIndexFilter() != null) {
QueryUtil.getAllFilterDimensionsAndMeasures(queryModel.getIndexFilter().getResolver(),
queryProperties.complexFilterDimension, queryProperties.filterMeasures);
}
queryStatistic = new QueryStatistic();
queryStatistic
.addStatistics(QueryStatisticsConstants.LOAD_DICTIONARY, System.currentTimeMillis());
queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
}
/**
* Method returns the block(s) on which query will get executed
*
* @param queryModel
* @return
* @throws IOException
*/
private List<AbstractIndex> getDataBlocks(QueryModel queryModel) throws IOException {
Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
// this is introduced to handle the case when CACHE_LEVEL=BLOCK and there are few other indexes
// like lucene, Bloom created on the table. In that case all the indexes will do blocklet
// level pruning and blockInfo entries will be repeated with different blockletIds
Map<String, DataFileFooter> filePathToFileFooterMapping = new HashMap<>();
Map<String, SegmentProperties> filePathToSegmentPropertiesMap = new HashMap<>();
for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
if (tableBlockInfos == null) {
tableBlockInfos = new ArrayList<>();
listMap.put(blockInfo.getFilePath(), tableBlockInfos);
}
SegmentProperties segmentProperties =
filePathToSegmentPropertiesMap.get(blockInfo.getFilePath());
BlockletDetailInfo blockletDetailInfo = blockInfo.getDetailInfo();
// This case can come in 2 scenarios:
// 1. old stores (1.1 or any prior version to 1.1) where blocklet information is not
// available so read the blocklet information from block file
// 2. CACHE_LEVEL is set to block
// 3. CACHE_LEVEL is BLOCKLET but filter column min/max is not cached in driver
if (null == blockletDetailInfo || blockletDetailInfo.getBlockletInfo() == null
|| blockletDetailInfo.isUseMinMaxForPruning()) {
if (null != blockletDetailInfo) {
blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
}
DataFileFooter fileFooter = filePathToFileFooterMapping.get(blockInfo.getFilePath());
if (null != blockInfo.getDataFileFooter()) {
fileFooter = blockInfo.getDataFileFooter();
}
if (null == fileFooter) {
blockInfo.setDetailInfo(null);
fileFooter = CarbonUtil.readMetadataFile(blockInfo);
// In case of non transactional table just set columnUniqueId as columnName to support
// backward compatibility. non transactional tables column uniqueId is always equal to
// columnName
if (!queryModel.getTable().isTransactionalTable()) {
QueryUtil.updateColumnUniqueIdForNonTransactionTable(fileFooter.getColumnInTable());
}
filePathToFileFooterMapping.put(blockInfo.getFilePath(), fileFooter);
if (null == blockletDetailInfo) {
blockletDetailInfo = QueryUtil.getBlockletDetailInfo(fileFooter, blockInfo);
}
blockInfo.setDetailInfo(blockletDetailInfo);
}
if (null == segmentProperties) {
segmentProperties = new SegmentProperties(fileFooter.getColumnInTable());
createFilterExpression(queryModel, segmentProperties);
updateColumns(queryModel, fileFooter.getColumnInTable(), blockInfo.getFilePath());
filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), segmentProperties);
}
readAndFillBlockletInfo(tableBlockInfos, blockInfo, blockletDetailInfo, fileFooter);
} else {
if (null == segmentProperties) {
segmentProperties = new SegmentProperties(blockInfo.getDetailInfo().getColumnSchemas());
createFilterExpression(queryModel, segmentProperties);
updateColumns(queryModel, blockInfo.getDetailInfo().getColumnSchemas(),
blockInfo.getFilePath());
filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), segmentProperties);
}
tableBlockInfos.add(blockInfo);
}
}
List<AbstractIndex> indexList = new ArrayList<>();
for (List<TableBlockInfo> tableBlockInfos : listMap.values()) {
indexList.add(new IndexWrapper(tableBlockInfos,
filePathToSegmentPropertiesMap.get(tableBlockInfos.get(0).getFilePath())));
}
return indexList;
}
/**
* It updates dimensions and measures of query model. In few scenarios like SDK user can configure
* sort options per load, so if first load has c1 as integer column and configure as sort column
* then carbon treat that as dimension.But in second load if user change the sort option then the
* c1 become measure as by default integers are measures. So this method updates the measures to
* dimensions and vice versa as per the index file schema.
*/
private void updateColumns(QueryModel queryModel, List<ColumnSchema> columnsInTable,
String filePath) throws IOException {
if (queryModel.getTable().isTransactionalTable()) {
return;
}
// First validate the schema of the carbondata file if the same column name have different
// datatype
boolean sameColumnSchemaList = BlockletIndexUtil
.isSameColumnAndDifferentDatatypeInSchema(columnsInTable,
queryModel.getTable().getTableInfo().getFactTable().getListOfColumns());
if (!sameColumnSchemaList) {
LOGGER.error("Datatype of the common columns present in " + filePath + " doesn't match with"
+ "the column's datatype in table schema");
throw new IOException("All common columns present in the files doesn't have same datatype. "
+ "Unsupported operation on nonTransactional table. Check logs.");
}
List<ProjectionDimension> dimensions = queryModel.getProjectionDimensions();
List<ProjectionMeasure> measures = queryModel.getProjectionMeasures();
List<ProjectionDimension> updatedDims = new ArrayList<>();
List<ProjectionMeasure> updatedMsrs = new ArrayList<>();
// Check and update dimensions to measures if it is measure in index file schema
for (ProjectionDimension dimension : dimensions) {
int index = columnsInTable.indexOf(dimension.getDimension().getColumnSchema());
if (index > -1) {
if (!columnsInTable.get(index).isDimensionColumn()) {
ProjectionMeasure measure = new ProjectionMeasure(
new CarbonMeasure(columnsInTable.get(index), dimension.getDimension().getOrdinal(),
dimension.getDimension().getSchemaOrdinal()));
measure.setOrdinal(dimension.getOrdinal());
updatedMsrs.add(measure);
} else {
updatedDims.add(dimension);
}
} else {
updatedDims.add(dimension);
}
}
// Check and update measure to dimension if it is dimension in index file schema.
for (ProjectionMeasure measure : measures) {
int index = columnsInTable.indexOf(measure.getMeasure().getColumnSchema());
if (index > -1) {
if (columnsInTable.get(index).isDimensionColumn()) {
ProjectionDimension dimension = new ProjectionDimension(
new CarbonDimension(columnsInTable.get(index), measure.getMeasure().getOrdinal(), -1,
measure.getMeasure().getSchemaOrdinal()));
dimension.setOrdinal(measure.getOrdinal());
updatedDims.add(dimension);
} else {
updatedMsrs.add(measure);
}
} else {
updatedMsrs.add(measure);
}
}
// Clear and update the query model projections.
dimensions.clear();
dimensions.addAll(updatedDims);
measures.clear();
measures.addAll(updatedMsrs);
}
private void createFilterExpression(QueryModel queryModel, SegmentProperties properties) {
if (queryModel.getIndexFilter() != null) {
if (!queryModel.getIndexFilter().isResolvedOnSegment(properties)) {
IndexFilter expression = new IndexFilter(properties, queryModel.getTable(),
queryModel.getIndexFilter().getExpression());
queryModel.setIndexFilter(expression);
}
}
}
/**
* Read the file footer of block file and get the blocklets to query
*/
private void readAndFillBlockletInfo(List<TableBlockInfo> tableBlockInfos,
TableBlockInfo blockInfo, BlockletDetailInfo blockletDetailInfo, DataFileFooter fileFooter) {
List<BlockletInfo> blockletList = fileFooter.getBlockletList();
// cases when blockletID will be -1
// 1. In case of legacy store
// 2. In case CACHE_LEVEL is block and no other index apart from blocklet index is
// created for a table
// In all above cases entries will be according to the number of blocks and not according to
// number of blocklets
if (blockletDetailInfo.getBlockletId() != -1) {
// fill the info only for given blockletId in detailInfo
BlockletInfo blockletInfo = blockletList.get(blockletDetailInfo.getBlockletId());
fillBlockletInfoToTableBlock(tableBlockInfos, blockInfo, fileFooter,
blockletInfo, blockletDetailInfo.getBlockletId());
} else {
short count = 0;
for (BlockletInfo blockletInfo : blockletList) {
fillBlockletInfoToTableBlock(tableBlockInfos, blockInfo, fileFooter,
blockletInfo, count);
count++;
}
}
}
private void fillBlockletInfoToTableBlock(List<TableBlockInfo> tableBlockInfos,
TableBlockInfo blockInfo, DataFileFooter fileFooter, BlockletInfo blockletInfo,
short blockletId) {
TableBlockInfo info = blockInfo.copy();
BlockletDetailInfo detailInfo = info.getDetailInfo();
// set column schema details
detailInfo.setColumnSchemas(fileFooter.getColumnInTable());
detailInfo.setRowCount(blockletInfo.getNumberOfRows());
byte[][] maxValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues();
byte[][] minValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues();
blockletInfo.getBlockletIndex().getMinMaxIndex().setMaxValues(maxValues);
blockletInfo.getBlockletIndex().getMinMaxIndex().setMinValues(minValues);
detailInfo.setBlockletInfo(blockletInfo);
detailInfo.setBlockletId(blockletId);
detailInfo.setPagesCount((short) blockletInfo.getNumberOfPages());
tableBlockInfos.add(info);
}
protected List<BlockExecutionInfo> getBlockExecutionInfos(QueryModel queryModel)
throws IOException {
initQuery(queryModel);
List<BlockExecutionInfo> blockExecutionInfoList = new ArrayList<BlockExecutionInfo>();
// fill all the block execution infos for all the blocks selected in
// query
// and query will be executed based on that infos
ReusableDataBuffer[] dimensionReusableDataBuffers = null;
ReusableDataBuffer[] measureReusableDataBuffers = null;
for (int i = 0; i < queryProperties.dataBlocks.size(); i++) {
AbstractIndex abstractIndex = queryProperties.dataBlocks.get(i);
BlockletDataRefNode dataRefNode =
(BlockletDataRefNode) abstractIndex.getDataRefNode();
final BlockExecutionInfo blockExecutionInfoForBlock =
getBlockExecutionInfoForBlock(
queryModel,
abstractIndex,
dataRefNode.numberOfNodes(),
dataRefNode.getBlockInfos().get(0).getFilePath(),
dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(),
dataRefNode.getBlockInfos().get(0).getSegment());
if (null == dimensionReusableDataBuffers || null == measureReusableDataBuffers) {
dimensionReusableDataBuffers = blockExecutionInfoForBlock.getDimensionReusableDataBuffer();
measureReusableDataBuffers = blockExecutionInfoForBlock.getMeasureReusableDataBuffer();
} else {
if (dimensionReusableDataBuffers.length == blockExecutionInfoForBlock
.getDimensionReusableDataBuffer().length) {
blockExecutionInfoForBlock.setDimensionReusableDataBuffer(dimensionReusableDataBuffers);
}
if (measureReusableDataBuffers.length == blockExecutionInfoForBlock
.getMeasureReusableDataBuffer().length) {
blockExecutionInfoForBlock.setMeasureReusableDataBuffer(measureReusableDataBuffers);
}
}
blockExecutionInfoList.add(blockExecutionInfoForBlock);
}
if (null != queryModel.getStatisticsRecorder()) {
QueryStatistic queryStatistic = new QueryStatistic();
queryStatistic.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKS_NUM,
blockExecutionInfoList.size());
queryModel.getStatisticsRecorder().recordStatistics(queryStatistic);
}
return blockExecutionInfoList;
}
/**
* Below method will be used to get the block execution info which is
* required to execute any block based on query model
*
* @param queryModel query model from user query
* @param blockIndex block index
* @return block execution info
*/
private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
AbstractIndex blockIndex, int numberOfBlockletToScan, String filePath,
String[] deleteDeltaFiles, Segment segment) {
BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
// set actual query dimensions and measures. It may differ in case of restructure scenarios
RestructureUtil.actualProjectionOfSegment(blockExecutionInfo, queryModel, segmentProperties);
// below is to get only those dimension in query which is present in the
// table block
List<ProjectionDimension> projectDimensions = RestructureUtil
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
blockExecutionInfo.getActualQueryDimensions(), segmentProperties.getDimensions(),
segmentProperties.getComplexDimensions(),
blockExecutionInfo.getActualQueryMeasures().length,
queryModel.getTable().getTableInfo().isTransactionalTable());
boolean isStandardTable = CarbonUtil.isStandardCarbonTable(queryModel.getTable());
String blockId = CarbonUtil
.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segment.getSegmentNo(),
queryModel.getTable().getTableInfo().isTransactionalTable(),
isStandardTable, queryModel.getTable().isHivePartitionTable());
if (!isStandardTable) {
blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockIdForPartitionTable(blockId));
} else {
blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockId(blockId));
}
blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
blockExecutionInfo.setStartBlockletIndex(0);
blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
blockExecutionInfo.setProjectionDimensions(projectDimensions
.toArray(new ProjectionDimension[0]));
// get measures present in the current block
List<ProjectionMeasure> projectionMeasures = RestructureUtil
.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo,
blockExecutionInfo.getActualQueryMeasures(), segmentProperties.getMeasures(),
queryModel.getTable().getTableInfo().isTransactionalTable());
blockExecutionInfo.setProjectionMeasures(
projectionMeasures.toArray(new ProjectionMeasure[projectionMeasures.size()]));
blockExecutionInfo.setDataBlock(blockIndex);
// setting whether raw record query or not
blockExecutionInfo.setRawRecordDetailQuery(queryModel.isForcedDetailRawQuery());
// total number dimension
blockExecutionInfo
.setTotalNumberDimensionToRead(
segmentProperties.getDimensionOrdinalToChunkMapping().size());
blockExecutionInfo.setReadOnlyDelta(queryModel.isReadOnlyDelta());
if (queryModel.isReadPageByPage()) {
blockExecutionInfo.setPrefetchBlocklet(false);
LOGGER.info("Query prefetch is: false, read page by page");
} else {
LOGGER.info("Query prefetch is: " + queryModel.isPreFetchData());
blockExecutionInfo.setPrefetchBlocklet(queryModel.isPreFetchData());
}
// In case of fg index it should not go to direct fill.
boolean fgIndexPathPresent = false;
for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
fgIndexPathPresent = blockInfo.getIndexWriterPath() != null;
if (fgIndexPathPresent) {
queryModel.setDirectVectorFill(false);
break;
}
}
blockExecutionInfo.setDirectVectorFill(queryModel.isDirectVectorFill());
blockExecutionInfo.setTotalNumberOfMeasureToRead(
segmentProperties.getMeasuresOrdinalToChunkMapping().size());
blockExecutionInfo.setComplexDimensionInfoMap(
QueryUtil.getComplexDimensionsMap(
projectDimensions,
segmentProperties.getDimensionOrdinalToChunkMapping(),
queryProperties.complexFilterDimension));
if (null != queryModel.getIndexFilter()) {
FilterResolverIntf filterResolverIntf;
if (!filePath.startsWith(queryModel.getTable().getTablePath())) {
filterResolverIntf = queryModel.getIndexFilter().getExternalSegmentResolver();
} else {
// loading the filter executor tree for filter evaluation
filterResolverIntf = queryModel.getIndexFilter().getResolver();
}
blockExecutionInfo.setFilterExecutorTree(
FilterUtil.getFilterExecutorTree(filterResolverIntf, segmentProperties,
blockExecutionInfo.getComplexDimensionInfoMap(), false));
}
// expression measure
List<CarbonMeasure> expressionMeasures =
new ArrayList<CarbonMeasure>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// setting all the dimension chunk indexes to be read from file
int numberOfElementToConsider = 0;
// list of dimensions to be projected
Set<Integer> allProjectionListDimensionIndexes = new LinkedHashSet<>();
// create a list of filter dimensions present in the current block
Set<CarbonDimension> currentBlockFilterDimensions =
getCurrentBlockFilterDimensions(queryProperties.complexFilterDimension, segmentProperties);
int[] dimensionChunkIndexes = QueryUtil.getDimensionChunkIndexes(projectDimensions,
segmentProperties.getDimensionOrdinalToChunkMapping(),
currentBlockFilterDimensions, allProjectionListDimensionIndexes);
ReusableDataBuffer[] dimensionBuffer = new ReusableDataBuffer[projectDimensions.size()];
for (int i = 0; i < dimensionBuffer.length; i++) {
dimensionBuffer[i] = new ReusableDataBuffer();
}
blockExecutionInfo.setDimensionReusableDataBuffer(dimensionBuffer);
int numberOfColumnToBeReadInOneIO = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULT_VALUE));
if (dimensionChunkIndexes.length > 0) {
numberOfElementToConsider = dimensionChunkIndexes[dimensionChunkIndexes.length - 1]
== segmentProperties.getBlockToDimensionOrdinalMapping().size() - 1 ?
dimensionChunkIndexes.length - 1 :
dimensionChunkIndexes.length;
blockExecutionInfo.setAllSelectedDimensionColumnIndexRange(
CarbonUtil.getRangeIndex(dimensionChunkIndexes, numberOfElementToConsider,
numberOfColumnToBeReadInOneIO));
} else {
blockExecutionInfo.setAllSelectedDimensionColumnIndexRange(new int[0][0]);
}
// get the list of updated filter measures present in the current block
Set<CarbonMeasure> filterMeasures =
getCurrentBlockFilterMeasures(queryProperties.filterMeasures, segmentProperties);
// list of measures to be projected
List<Integer> allProjectionListMeasureIndexes = new ArrayList<>();
int[] measureChunkIndexes = QueryUtil.getMeasureChunkIndexes(
projectionMeasures, expressionMeasures,
segmentProperties.getMeasuresOrdinalToChunkMapping(), filterMeasures,
allProjectionListMeasureIndexes);
ReusableDataBuffer[] measureBuffer =
new ReusableDataBuffer[allProjectionListMeasureIndexes.size()];
for (int i = 0; i < measureBuffer.length; i++) {
measureBuffer[i] = new ReusableDataBuffer();
}
blockExecutionInfo.setMeasureReusableDataBuffer(measureBuffer);
if (measureChunkIndexes.length > 0) {
numberOfElementToConsider = measureChunkIndexes[measureChunkIndexes.length - 1]
== segmentProperties.getMeasures().size() - 1 ?
measureChunkIndexes.length - 1 :
measureChunkIndexes.length;
// setting all the measure chunk indexes to be read from file
blockExecutionInfo.setAllSelectedMeasureIndexRange(
CarbonUtil.getRangeIndex(
measureChunkIndexes, numberOfElementToConsider,
numberOfColumnToBeReadInOneIO));
} else {
blockExecutionInfo.setAllSelectedMeasureIndexRange(new int[0][0]);
}
// setting the indexes of list of dimension in projection list
blockExecutionInfo.setProjectionListDimensionIndexes(ArrayUtils.toPrimitive(
allProjectionListDimensionIndexes
.toArray(new Integer[allProjectionListDimensionIndexes.size()])));
// setting the indexes of list of measures in projection list
blockExecutionInfo.setProjectionListMeasureIndexes(ArrayUtils.toPrimitive(
allProjectionListMeasureIndexes
.toArray(new Integer[allProjectionListMeasureIndexes.size()])));
// setting the size of fixed key column (dictionary column)
blockExecutionInfo
.setFixedLengthKeySize(getKeySize(projectDimensions, segmentProperties));
List<Integer> dictionaryColumnChunkIndex = new ArrayList<Integer>();
List<Integer> noDictionaryColumnChunkIndex = new ArrayList<Integer>();
// get the block index to be read from file for query dimension
// for both dictionary columns and no dictionary columns
QueryUtil.fillQueryDimensionChunkIndexes(projectDimensions,
segmentProperties.getDimensionOrdinalToChunkMapping(), dictionaryColumnChunkIndex,
noDictionaryColumnChunkIndex);
int[] queryDictionaryColumnChunkIndexes = ArrayUtils.toPrimitive(
dictionaryColumnChunkIndex.toArray(new Integer[dictionaryColumnChunkIndex.size()]));
// need to sort the dictionary column as for all dimension
// column key will be filled based on key order
if (!queryModel.isForcedDetailRawQuery()) {
Arrays.sort(queryDictionaryColumnChunkIndexes);
}
blockExecutionInfo.setDictionaryColumnChunkIndex(queryDictionaryColumnChunkIndexes);
// setting the no dictionary column block indexes
blockExecutionInfo.setNoDictionaryColumnChunkIndexes(ArrayUtils.toPrimitive(
noDictionaryColumnChunkIndex.toArray(new Integer[noDictionaryColumnChunkIndex.size()])));
blockExecutionInfo.setComplexColumnParentBlockIndexes(
getComplexDimensionParentBlockIndexes(projectDimensions));
blockExecutionInfo.setVectorBatchCollector(queryModel.isVectorReader());
DataTypeUtil.setDataTypeConverter(queryModel.getConverter());
blockExecutionInfo.setRequiredRowId(queryModel.isRequiredRowId());
return blockExecutionInfo;
}
/**
* This method will be used to get fixed key length size this will be used
* to create a row from column chunk
*
* @param queryDimension query dimension
* @param blockMetadataInfo block metadata info
* @return key size
*/
private int getKeySize(List<ProjectionDimension> queryDimension,
SegmentProperties blockMetadataInfo) {
// add the dimension block ordinal for each dictionary column
// existing in the current block dimensions. Set is used because in case of column groups
// ordinal of columns in a column group will be same
Set<Integer> fixedLengthDimensionOrdinal =
new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
int counter = 0;
while (counter < queryDimension.size()) {
if (queryDimension.get(counter).getDimension().getNumberOfChild() > 0) {
counter += queryDimension.get(counter).getDimension().getNumberOfChild();
} else if (queryDimension.get(counter).getDimension().getDataType() != DataTypes.DATE) {
counter++;
} else {
fixedLengthDimensionOrdinal.add(blockMetadataInfo.getDimensionOrdinalToChunkMapping()
.get(queryDimension.get(counter).getDimension().getOrdinal()));
counter++;
}
}
int[] dictionaryColumnOrdinal = ArrayUtils.toPrimitive(
fixedLengthDimensionOrdinal.toArray(new Integer[fixedLengthDimensionOrdinal.size()]));
// calculate the size of existing query dictionary columns in this block
if (blockMetadataInfo.getNumberOfDictDimensions() > 0) {
int[] eachColumnValueSize = blockMetadataInfo.createDimColumnValueLength();
int keySize = 0;
for (int i = 0; i < dictionaryColumnOrdinal.length; i++) {
keySize += eachColumnValueSize[dictionaryColumnOrdinal[i]];
}
return keySize;
}
return 0;
}
private int[] getComplexDimensionParentBlockIndexes(List<ProjectionDimension> queryDimensions) {
List<Integer> parentBlockIndexList = new ArrayList<Integer>();
for (ProjectionDimension queryDimension : queryDimensions) {
if (queryDimension.getDimension().getDataType().isComplexType()) {
if (null != queryDimension.getDimension().getComplexParentDimension()) {
if (queryDimension.getDimension().isComplex()) {
parentBlockIndexList.add(queryDimension.getDimension().getOrdinal());
} else {
parentBlockIndexList.add(queryDimension.getParentDimension().getOrdinal());
}
} else {
parentBlockIndexList.add(queryDimension.getDimension().getOrdinal());
}
}
}
return ArrayUtils
.toPrimitive(parentBlockIndexList.toArray(new Integer[parentBlockIndexList.size()]));
}
/**
* This method will create the updated list of filter measures present in the current block
*
* @param queryFilterMeasures
* @param segmentProperties
* @return
*/
private Set<CarbonMeasure> getCurrentBlockFilterMeasures(Set<CarbonMeasure> queryFilterMeasures,
SegmentProperties segmentProperties) {
if (!queryFilterMeasures.isEmpty()) {
Set<CarbonMeasure> updatedFilterMeasures = new HashSet<>(queryFilterMeasures.size());
for (CarbonMeasure queryMeasure : queryFilterMeasures) {
CarbonMeasure measureFromCurrentBlock =
segmentProperties.getMeasureFromCurrentBlock(queryMeasure);
if (null != measureFromCurrentBlock) {
updatedFilterMeasures.add(measureFromCurrentBlock);
}
}
return updatedFilterMeasures;
} else {
return queryFilterMeasures;
}
}
/**
* This method will create the updated list of filter dimensions present in the current block
*
* @param queryFilterDimensions
* @param segmentProperties
* @return
*/
private Set<CarbonDimension> getCurrentBlockFilterDimensions(
Set<CarbonDimension> queryFilterDimensions, SegmentProperties segmentProperties) {
if (!queryFilterDimensions.isEmpty()) {
Set<CarbonDimension> updatedFilterDimensions = new HashSet<>(queryFilterDimensions.size());
for (CarbonDimension queryDimension : queryFilterDimensions) {
CarbonDimension dimensionFromCurrentBlock =
segmentProperties.getDimensionFromCurrentBlock(queryDimension);
if (null != dimensionFromCurrentBlock) {
updatedFilterDimensions.add(dimensionFromCurrentBlock);
}
}
return updatedFilterDimensions;
} else {
return queryFilterDimensions;
}
}
/**
* Below method will be used to finish the execution
*
* @throws QueryExecutionException
*/
@Override
public void finish() throws QueryExecutionException {
CarbonUtil.clearBlockCache(queryProperties.dataBlocks);
Throwable exceptionOccurred = null;
if (null != queryIterator) {
// catch if there is any exception so that it can be rethrown after clearing all the resources
// else if any exception is thrown from this point executor service will not be terminated
try {
queryIterator.close();
} catch (Throwable e) {
exceptionOccurred = e;
}
}
// clear all the unsafe memory used for the given task ID only if it is necessary to be cleared
if (freeUnsafeMemory) {
UnsafeMemoryManager.INSTANCE
.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
ThreadLocalTaskInfo.clearCarbonTaskInfo();
}
if (null != queryProperties.executorService) {
// In case of limit query when number of limit records is already found so executors
// must stop all the running execution otherwise it will keep running and will hit
// the query performance.
queryProperties.executorService.shutdownNow();
}
// if there is any exception re throw the exception
if (null != exceptionOccurred) {
throw new QueryExecutionException(exceptionOccurred);
}
DataTypeUtil.clearFormatter();
}
}