blob: 49228eb5825b01e99f41f040a83adc5e98a7e08c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.scan.collector.impl;
import java.nio.ByteBuffer;
import java.util.*;
import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.model.QueryDimension;
import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.result.AbstractScannedResult;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.commons.lang3.ArrayUtils;
/**
* It is not a collector it is just a scanned result holder.
*/
public class DictionaryBasedResultCollector extends AbstractScannedResultCollector {
protected QueryDimension[] queryDimensions;
protected QueryMeasure[] queryMeasures;
protected DirectDictionaryGenerator[] directDictionaryGenerators;
/**
* query order
*/
protected int[] order;
protected int[] actualIndexInSurrogateKey;
protected boolean[] dictionaryEncodingArray;
protected boolean[] directDictionaryEncodingArray;
protected boolean[] implictColumnArray;
protected boolean[] complexDataTypeArray;
protected int dictionaryColumnIndex;
protected int noDictionaryColumnIndex;
protected int complexTypeColumnIndex;
protected boolean isDimensionExists;
protected Map<Integer, GenericQueryType> comlexDimensionInfoMap;
public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
initDimensionAndMeasureIndexesForFillingData();
isDimensionExists = queryDimensions.length > 0;
this.comlexDimensionInfoMap = tableBlockExecutionInfos.getComlexDimensionInfoMap();
}
/**
* This method will add a record both key and value to list object
* it will keep track of how many record is processed, to handle limit scenario
*/
@Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
// scan the record and add to list
List<Object[]> listBasedResult = new ArrayList<>(batchSize);
int rowCounter = 0;
int[] surrogateResult;
byte[][] noDictionaryKeys;
byte[][] complexTypeKeyArray;
BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
scannedResult.getDeleteDeltaDataCache();
while (scannedResult.hasNext() && rowCounter < batchSize) {
Object[] row = new Object[queryDimensions.length + queryMeasures.length];
if (isDimensionExists) {
surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
noDictionaryKeys = scannedResult.getNoDictionaryKeyArray();
complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
dictionaryColumnIndex = 0;
noDictionaryColumnIndex = 0;
complexTypeColumnIndex = 0;
for (int i = 0; i < queryDimensions.length; i++) {
fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
comlexDimensionInfoMap, row, i);
}
} else {
scannedResult.incrementCounter();
}
if (null != deleteDeltaDataCache && deleteDeltaDataCache
.contains(scannedResult.getCurrentRowId())) {
continue;
}
fillMeasureData(scannedResult, row);
listBasedResult.add(row);
rowCounter++;
}
return listBasedResult;
}
protected void fillDimensionData(AbstractScannedResult scannedResult, int[] surrogateResult,
byte[][] noDictionaryKeys, byte[][] complexTypeKeyArray,
Map<Integer, GenericQueryType> comlexDimensionInfoMap, Object[] row, int i) {
if (!dictionaryEncodingArray[i]) {
if (implictColumnArray[i]) {
if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
.equals(queryDimensions[i].getDimension().getColName())) {
row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR + scannedResult
.getCurrentRowId(), DataType.STRING);
} else {
row[order[i]] =
DataTypeUtil.getDataBasedOnDataType(scannedResult.getBlockletId(), DataType.STRING);
}
} else {
row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
noDictionaryKeys[noDictionaryColumnIndex++],
queryDimensions[i].getDimension().getDataType());
}
} else if (directDictionaryEncodingArray[i]) {
if (directDictionaryGenerators[i] != null) {
row[order[i]] = directDictionaryGenerators[i].getValueFromSurrogate(
surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
}
} else if (complexDataTypeArray[i]) {
row[order[i]] = comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
.getDataBasedOnDataTypeFromSurrogates(
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
dictionaryColumnIndex++;
} else {
row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
}
}
protected void fillMeasureData(AbstractScannedResult scannedResult, Object[] row) {
if (measureInfo.getMeasureDataTypes().length > 0) {
Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
fillMeasureData(msrValues, 0, scannedResult);
for (int i = 0; i < msrValues.length; i++) {
row[order[i + queryDimensions.length]] = msrValues[i];
}
}
}
protected void initDimensionAndMeasureIndexesForFillingData() {
List<Integer> dictionaryIndexes = new ArrayList<Integer>();
for (int i = 0; i < queryDimensions.length; i++) {
if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
}
}
int[] primitive =
ArrayUtils.toPrimitive(dictionaryIndexes.toArray(new Integer[dictionaryIndexes.size()]));
Arrays.sort(primitive);
actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
int index = 0;
for (int i = 0; i < queryDimensions.length; i++) {
if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
actualIndexInSurrogateKey[index++] =
Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
}
}
dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
directDictionaryEncodingArray = CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
order = new int[queryDimensions.length + queryMeasures.length];
for (int i = 0; i < queryDimensions.length; i++) {
order[i] = queryDimensions[i].getQueryOrder();
}
for (int i = 0; i < queryMeasures.length; i++) {
order[i + queryDimensions.length] = queryMeasures[i].getQueryOrder();
}
directDictionaryGenerators = new DirectDictionaryGenerator[queryDimensions.length];
for (int i = 0; i < queryDimensions.length; i++) {
directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
}
}
}