blob: a77f7944ce17456f3d29391b269619ddd2a9c31c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.scan.collector.impl;
import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.result.BlockletScannedResult;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
/**
* It is not a collector it is just a scanned result holder.
*/
public class RestructureBasedRawResultCollector extends RawBasedResultCollector {
public RestructureBasedRawResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
}
/**
* This method will add a record both key and value to list object
* it will keep track of how many record is processed, to handle limit scenario
*/
@Override
public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult, int batchSize) {
long startTime = System.currentTimeMillis();
List<Object[]> listBasedResult = new ArrayList<>(batchSize);
ProjectionMeasure[] queryMeasures = executionInfo.getActualQueryMeasures();
// scan the record and add to list
scanAndFillData(scannedResult, batchSize, listBasedResult, queryMeasures);
// re-fill dictionary and no dictionary key arrays for the newly added columns
if (dimensionInfo.isDictionaryColumnAdded()) {
fillDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
}
if (dimensionInfo.isNoDictionaryColumnAdded()) {
fillNoDictionaryKeyArrayBatchWithLatestSchema(listBasedResult);
}
QueryStatistic resultPrepTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.RESULT_PREP_TIME);
resultPrepTime.addCountStatistic(QueryStatisticsConstants.RESULT_PREP_TIME,
resultPrepTime.getCount() + (System.currentTimeMillis() - startTime));
return listBasedResult;
}
/**
* This method will fill the dictionary key array with newly added dictionary columns if any
*
* @param rows
* @return
*/
private void fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
for (Object[] row : rows) {
ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
byte[] dictKeyArray = byteArrayWrapper.getDictionaryKey();
ProjectionDimension[] actualQueryDimensions = executionInfo.getActualQueryDimensions();
int newKeyArrayLength = dimensionInfo.getNewDictionaryColumnCount();
long[] keyArray = null;
if (executionInfo.getDataBlock().getSegmentProperties().getNumberOfDictDimensions() > 0) {
keyArray = ByteUtil.convertBytesToLongArray(dictKeyArray);
newKeyArrayLength += keyArray.length;
}
long[] keyArrayWithNewAddedColumns = new long[newKeyArrayLength];
int existingColumnKeyArrayIndex = 0;
int newKeyArrayIndex = 0;
for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
if (actualQueryDimensions[i].getDimension().getDataType() == DataTypes.DATE) {
// if dimension exists then add the key array value else add the default value
if (dimensionInfo.getDimensionExists()[i] && null != keyArray && 0 != keyArray.length) {
keyArrayWithNewAddedColumns[newKeyArrayIndex++] =
keyArray[existingColumnKeyArrayIndex++];
} else {
long defaultValueAsLong;
Object defaultValue = dimensionInfo.getDefaultValues()[i];
if (null != defaultValue) {
defaultValueAsLong = ((Integer) defaultValue).longValue();
} else {
defaultValueAsLong = (long) CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
}
keyArrayWithNewAddedColumns[newKeyArrayIndex++] = defaultValueAsLong;
}
}
}
dictKeyArray = ByteUtil.convertLongArrayToBytes(keyArrayWithNewAddedColumns);
byteArrayWrapper.setDictionaryKey(dictKeyArray);
}
}
/**
* This method will fill the no dictionary byte array with newly added no dictionary columns
*
* @param rows
* @return
*/
private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
for (Object[] row : rows) {
ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
ProjectionDimension[] actualQueryDimensions = executionInfo.getActualQueryDimensions();
byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
new byte[noDictKeyArray.length + dimensionInfo.getNewNoDictionaryColumnCount()][];
int existingColumnValueIndex = 0;
int newKeyArrayIndex = 0;
for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
if (actualQueryDimensions[i].getDimension().getDataType() != DataTypes.DATE
&& !actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
// if dimension exists then add the byte array value else add the default value
if (dimensionInfo.getDimensionExists()[i]) {
noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
noDictKeyArray[existingColumnValueIndex++];
} else {
byte[] newColumnDefaultValue = null;
Object defaultValue = dimensionInfo.getDefaultValues()[i];
if (null != defaultValue) {
newColumnDefaultValue = (byte[]) defaultValue;
} else if (actualQueryDimensions[i].getDimension().getDataType() == DataTypes.STRING) {
newColumnDefaultValue =
DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
} else {
newColumnDefaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
}
noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] = newColumnDefaultValue;
}
}
}
byteArrayWrapper.setNoDictionaryKeys(noDictionaryKeyArrayWithNewlyAddedColumns);
}
}
}