blob: 947a561d2a50ce71bc627c4917eac9f9e046b380 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.scan.collector.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.complextypes.StructQueryType;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.result.BlockletScannedResult;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.commons.lang3.ArrayUtils;
/**
* It is not a collector it is just a scanned result holder.
*/
public class DictionaryBasedResultCollector extends AbstractScannedResultCollector {
protected ProjectionDimension[] queryDimensions;
protected ProjectionMeasure[] queryMeasures;
private DirectDictionaryGenerator[] directDictionaryGenerators;
/**
* query order
*/
protected int[] order;
private int[] actualIndexInSurrogateKey;
private boolean[] implicitColumnArray;
private boolean[] complexDataTypeArray;
int dictionaryColumnIndex;
int noDictionaryColumnIndex;
int complexTypeColumnIndex;
boolean isDimensionExists;
private int[] surrogateResult;
private byte[][] noDictionaryKeys;
private byte[][] complexTypeKeyArray;
protected Map<Integer, GenericQueryType> complexDimensionInfoMap;
/**
* Field of this Map is the parent Column and associated child columns.
* Final Projection should be a merged list consist of only parents.
*/
private Map<Integer, List<Integer>> parentToChildColumnsMap = new HashMap<>();
/**
* Map to hold the complex parent ordinal of each query dimension
*/
private List<Integer> queryDimensionToComplexParentOrdinal = new ArrayList<>();
/**
* Fields of this Map of Parent Ordinal with the List is the Child Column Dimension and
* the corresponding data buffer of that column.
*/
private Map<Integer, Map<CarbonDimension, ByteBuffer>> mergedComplexDimensionIndex =
new HashMap<>();
private boolean readOnlyDelta;
public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
queryDimensions = executionInfo.getProjectionDimensions();
queryMeasures = executionInfo.getProjectionMeasures();
initDimensionAndMeasureIndexesForFillingData();
isDimensionExists = queryDimensions.length > 0;
this.complexDimensionInfoMap = executionInfo.getComplexDimensionInfoMap();
this.readOnlyDelta = executionInfo.isReadOnlyDelta();
}
/**
* This method will add a record both key and value to list object
* it will keep track of how many record is processed, to handle limit scenario
*/
@Override
public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult, int batchSize) {
// scan the record and add to list
List<Object[]> listBasedResult = new ArrayList<>(batchSize);
int rowCounter = 0;
boolean isStructQueryType = false;
for (Object obj : scannedResult.complexParentIndexToQueryMap.values()) {
if (obj instanceof StructQueryType) {
//if any one of the map elements contains struct,need to shift rows if contains null.
isStructQueryType = true;
break;
}
}
boolean[] isComplexChildColumn = null;
if (isStructQueryType) {
// need to identify complex child columns for shifting rows if contains null
isComplexChildColumn = new boolean[queryDimensions.length + queryMeasures.length];
for (ProjectionDimension dimension : queryDimensions) {
if (null != dimension.getDimension().getComplexParentDimension()) {
isComplexChildColumn[dimension.getOrdinal()] = true;
}
}
}
while (scannedResult.hasNext() && rowCounter < batchSize) {
scannedResult.incrementCounter();
if (readOnlyDelta) {
if (!scannedResult.containsDeletedRow(scannedResult.getCurrentRowId()) &&
scannedResult.getCurrentDeleteDeltaVo() != null) {
continue;
}
} else {
if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
continue;
}
}
Object[] row = new Object[queryDimensions.length + queryMeasures.length];
if (isDimensionExists) {
surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
noDictionaryKeys = scannedResult.getNoDictionaryKeyArray();
complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
dictionaryColumnIndex = 0;
noDictionaryColumnIndex = 0;
complexTypeColumnIndex = 0;
// get the complex columns data of this row
fillComplexColumnDataBufferForThisRow();
for (int i = 0; i < queryDimensions.length; i++) {
fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
complexDimensionInfoMap, row, i, queryDimensions[i].getDimension().getOrdinal());
}
}
fillMeasureData(scannedResult, row);
if (isStructQueryType) {
shiftNullForStruct(row, isComplexChildColumn);
}
listBasedResult.add(row);
rowCounter++;
}
return listBasedResult;
}
/**
* shift the complex column null to the end
*
* @param row
* @param isComplexChildColumn
*/
private void shiftNullForStruct(Object[] row, boolean[] isComplexChildColumn) {
int count = 0;
// If a : <b,c> and d : <e,f> are two struct and if a.b,a.c,d.e is given in the
// projection list,then object array will contain a,null,d as result, because for a.b,
// a will be filled and for a.c null will be placed.
// Instead place null in the end of object array and send a,d,null as result.
for (int j = 0; j < row.length; j++) {
if (null == row[j] && !isComplexChildColumn[j]) {
// if it is a primitive column, don't shift the null to the end.
row[count++] = null;
} else if (null != row[j]) {
row[count++] = row[j];
}
}
// fill the skipped content
while (count < row.length) row[count++] = null;
}
private void fillComplexColumnDataBufferForThisRow() {
mergedComplexDimensionIndex.clear();
int noDictionaryComplexColumnIndex = 0;
int complexTypeComplexColumnIndex = 0;
for (int i = 0; i < queryDimensions.length; i++) {
int complexParentOrdinal = queryDimensionToComplexParentOrdinal.get(i);
if (complexParentOrdinal != -1) {
Map<CarbonDimension, ByteBuffer> childColumnByteBuffer;
// Add the parent and the child ordinal to the parentToChildColumnsMap
if (mergedComplexDimensionIndex.get(complexParentOrdinal) == null) {
childColumnByteBuffer = new HashMap<>();
} else {
childColumnByteBuffer = mergedComplexDimensionIndex.get(complexParentOrdinal);
}
// send the byte buffer for the complex columns. Currently expected columns for
// complex types are
// a) Complex Columns
// b) No Dictionary columns.
// TODO have to fill out for dictionary columns. Once the support for push down in
// complex dictionary columns comes.
ByteBuffer buffer;
if (queryDimensions[i].getDimension().getDataType() != DataTypes.DATE) {
if (implicitColumnArray[i]) {
throw new RuntimeException("Not Supported Column Type");
} else if (complexDataTypeArray[i]) {
buffer = ByteBuffer.wrap(complexTypeKeyArray[complexTypeComplexColumnIndex++]);
} else {
buffer = ByteBuffer.wrap(noDictionaryKeys[noDictionaryComplexColumnIndex++]);
}
} else if (queryDimensions[i].getDimension().getDataType() == DataTypes.DATE) {
throw new RuntimeException("Direct Dictionary Column Type Not Supported Yet.");
} else if (complexDataTypeArray[i]) {
buffer = ByteBuffer.wrap(complexTypeKeyArray[complexTypeComplexColumnIndex++]);
} else {
throw new RuntimeException("Not Supported Column Type");
}
childColumnByteBuffer
.put(queryDimensions[i].getDimension(), buffer);
mergedComplexDimensionIndex.put(complexParentOrdinal, childColumnByteBuffer);
} else if (!queryDimensions[i].getDimension().isComplex()) {
// If Dimension is not a Complex Column, then increment index for noDictionaryComplexColumn
noDictionaryComplexColumnIndex++;
}
}
}
/**
* fill the data of dimension columns into row
*
* @param scannedResult
* @param surrogateResult
* @param noDictionaryKeys
* @param complexTypeKeyArray
* @param complexDimensionInfoMap
* @param row: row data
* @param i: dimension columns index
* @param actualOrdinal: the actual ordinal of dimension columns in segment
*
*/
void fillDimensionData(BlockletScannedResult scannedResult, int[] surrogateResult,
byte[][] noDictionaryKeys, byte[][] complexTypeKeyArray,
Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i,
int actualOrdinal) {
if (queryDimensions[i].getDimension().getDataType() != DataTypes.DATE) {
if (implicitColumnArray[i]) {
if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
.equals(queryDimensions[i].getColumnName())) {
row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR + scannedResult
.getCurrentPageCounter() + CarbonCommonConstants.FILE_SEPARATOR + scannedResult
.getCurrentRowId(), DataTypes.STRING);
} else {
row[order[i]] =
DataTypeUtil.getDataBasedOnDataType(scannedResult.getBlockletId(), DataTypes.STRING);
}
} else if (complexDataTypeArray[i]) {
// Complex Type With No Dictionary Encoding.
if (queryDimensionToComplexParentOrdinal.get(i) != -1) {
fillRow(complexDimensionInfoMap, row, i,
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
} else {
row[order[i]] =
complexDimensionInfoMap.get(actualOrdinal).getDataBasedOnDataType(
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
}
} else {
if (queryDimensionToComplexParentOrdinal.get(i) != -1) {
// When the parent Ordinal is not -1 then this is a predicate is being pushed down
// for complex column.
fillRow(complexDimensionInfoMap, row, i,
ByteBuffer.wrap(noDictionaryKeys[noDictionaryColumnIndex++]));
} else {
row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
noDictionaryKeys[noDictionaryColumnIndex++],
queryDimensions[i].getDimension().getDataType());
}
}
} else if (queryDimensions[i].getDimension().getDataType() == DataTypes.DATE) {
if (directDictionaryGenerators[i] != null) {
row[order[i]] = directDictionaryGenerators[i].getValueFromSurrogate(
surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
}
} else if (complexDataTypeArray[i]) {
row[order[i]] = complexDimensionInfoMap.get(actualOrdinal)
.getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
dictionaryColumnIndex++;
} else {
row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
}
}
private void fillRow(Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i,
ByteBuffer wrap) {
if (parentToChildColumnsMap.get(queryDimensionToComplexParentOrdinal.get(i)).size() > 1) {
fillRowForComplexColumn(complexDimensionInfoMap, row, i);
} else {
row[order[i]] = complexDimensionInfoMap.get(queryDimensionToComplexParentOrdinal.get(i))
.getDataBasedOnColumn(wrap, queryDimensions[i].getDimension().getComplexParentDimension(),
queryDimensions[i].getDimension());
}
}
private void fillRowForComplexColumn(Map<Integer, GenericQueryType> complexDimensionInfoMap,
Object[] row, int i) {
// When multiple columns are then the first child elements is only going to make
// parent Object Array. For all other cases it should be null.
// For e.g. a : <b,c,d>. here as a is the parent column and b, c, d are child columns
// during traversal when we encounter the first element in list i.e. column 'b'
// a will be completely filled. In case when column 'c' and 'd' encountered then
// only place null in the output.
int complexParentOrdinal = queryDimensionToComplexParentOrdinal.get(i);
List<Integer> childColumns = parentToChildColumnsMap.get(complexParentOrdinal);
if (childColumns.get(0).equals(queryDimensions[i].getDimension().getOrdinal())) {
// Fill out Parent Column.
row[order[i]] = complexDimensionInfoMap.get(complexParentOrdinal).getDataBasedOnColumnList(
mergedComplexDimensionIndex.get(queryDimensions[i].getParentDimension().getOrdinal()),
queryDimensions[i].getParentDimension());
} else {
row[order[i]] = null;
}
}
void fillMeasureData(BlockletScannedResult scannedResult, Object[] row) {
if (measureInfo.getMeasureDataTypes().length > 0) {
Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
fillMeasureData(msrValues, 0, scannedResult);
for (int i = 0; i < msrValues.length; i++) {
row[order[i + queryDimensions.length]] = msrValues[i];
}
}
}
void initDimensionAndMeasureIndexesForFillingData() {
List<Integer> dictionaryIndexes = new ArrayList<Integer>();
for (int i = 0; i < queryDimensions.length; i++) {
if (queryDimensions[i].getDimension().getDataType() == DataTypes.DATE) {
dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
}
}
int[] primitive =
ArrayUtils.toPrimitive(dictionaryIndexes.toArray(new Integer[dictionaryIndexes.size()]));
Arrays.sort(primitive);
actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
int index = 0;
implicitColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
parentToChildColumnsMap.clear();
queryDimensionToComplexParentOrdinal.clear();
for (int i = 0; i < queryDimensions.length; i++) {
if (queryDimensions[i].getDimension().getDataType() == DataTypes.DATE) {
actualIndexInSurrogateKey[index++] =
Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
}
if (null != queryDimensions[i].getDimension().getComplexParentDimension()) {
// Add the parent and the child ordinal to the parentToChildColumnsMap
int complexParentOrdinal =
queryDimensions[i].getDimension().getComplexParentDimension().getOrdinal();
queryDimensionToComplexParentOrdinal.add(complexParentOrdinal);
if (parentToChildColumnsMap.get(complexParentOrdinal) == null) {
// Add the parent and child ordinal in the map
List<Integer> childOrdinals = new ArrayList<>();
childOrdinals.add(queryDimensions[i].getDimension().getOrdinal());
parentToChildColumnsMap.put(complexParentOrdinal, childOrdinals);
} else {
List<Integer> childOrdinals = parentToChildColumnsMap.get(complexParentOrdinal);
childOrdinals.add(queryDimensions[i].getDimension().getOrdinal());
parentToChildColumnsMap.put(complexParentOrdinal, childOrdinals);
}
} else {
queryDimensionToComplexParentOrdinal.add(-1);
}
}
order = new int[queryDimensions.length + queryMeasures.length];
for (int i = 0; i < queryDimensions.length; i++) {
order[i] = queryDimensions[i].getOrdinal();
}
for (int i = 0; i < queryMeasures.length; i++) {
order[i + queryDimensions.length] = queryMeasures[i].getOrdinal();
}
directDictionaryGenerators = new DirectDictionaryGenerator[queryDimensions.length];
for (int i = 0; i < queryDimensions.length; i++) {
directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
}
}
}