| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.carbondata.core.scan.collector.impl; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.carbondata.common.logging.LogService; |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.datastore.block.SegmentProperties; |
| import org.apache.carbondata.core.keygenerator.KeyGenException; |
| import org.apache.carbondata.core.keygenerator.KeyGenerator; |
| import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator; |
| import org.apache.carbondata.core.metadata.encoder.Encoding; |
| import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; |
| import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; |
| import org.apache.carbondata.core.scan.model.QueryDimension; |
| import org.apache.carbondata.core.scan.model.QueryMeasure; |
| import org.apache.carbondata.core.scan.result.AbstractScannedResult; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| |
| import org.apache.commons.lang3.ArrayUtils; |
| import org.apache.spark.unsafe.types.UTF8String; |
| |
| /** |
| * It is not a collector it is just a scanned result holder. |
| */ |
| public class RestructureBasedRawResultCollector extends RawBasedResultCollector { |
| |
| /** |
| * logger |
| */ |
| private static final LogService LOGGER = |
| LogServiceFactory.getLogService(RestructureBasedRawResultCollector.class.getName()); |
| |
| /** |
| * Key generator which will form the mdKey according to latest schema |
| */ |
| private KeyGenerator restructuredKeyGenerator; |
| |
| /** |
| * Key generator for uncompressing current block values |
| */ |
| private KeyGenerator updatedCurrentBlockKeyGenerator; |
| |
| public RestructureBasedRawResultCollector(BlockExecutionInfo blockExecutionInfos) { |
| super(blockExecutionInfos); |
| initRestructuredKeyGenerator(); |
| initCurrentBlockKeyGenerator(); |
| } |
| |
| /** |
| * This method will create a new key generator for generating mdKey according to latest schema |
| */ |
| private void initRestructuredKeyGenerator() { |
| SegmentProperties segmentProperties = |
| tableBlockExecutionInfos.getDataBlock().getSegmentProperties(); |
| QueryDimension[] queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); |
| List<Integer> updatedColumnCardinality = new ArrayList<>(queryDimensions.length); |
| List<Integer> updatedDimensionPartitioner = new ArrayList<>(queryDimensions.length); |
| int[] dictionaryColumnBlockIndex = tableBlockExecutionInfos.getDictionaryColumnBlockIndex(); |
| int dimCounterInCurrentBlock = 0; |
| for (int i = 0; i < queryDimensions.length; i++) { |
| if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) { |
| if (tableBlockExecutionInfos.getDimensionInfo().getDimensionExists()[i]) { |
| // get the dictionary key ordinal as column cardinality in segment properties |
| // will only be for dictionary encoded columns |
| CarbonDimension currentBlockDimension = segmentProperties.getDimensions() |
| .get(dictionaryColumnBlockIndex[dimCounterInCurrentBlock]); |
| updatedColumnCardinality.add( |
| segmentProperties.getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]); |
| updatedDimensionPartitioner.add( |
| segmentProperties.getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]); |
| dimCounterInCurrentBlock++; |
| } else { |
| // partitioner index will be 1 every column will be in columnar format |
| updatedDimensionPartitioner.add(1); |
| // for direct dictionary 4 bytes need to be allocated else 1 |
| if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { |
| updatedColumnCardinality.add(Integer.MAX_VALUE); |
| } else { |
| // cardinality will be 2 will user has provided a default value |
| byte[] defaultValue = queryDimensions[i].getDimension().getDefaultValue(); |
| if (null != defaultValue) { |
| updatedColumnCardinality |
| .add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1); |
| } else { |
| updatedColumnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY); |
| } |
| } |
| } |
| } |
| } |
| if (!updatedColumnCardinality.isEmpty()) { |
| int[] latestColumnCardinality = ArrayUtils.toPrimitive( |
| updatedColumnCardinality.toArray(new Integer[updatedColumnCardinality.size()])); |
| int[] latestColumnPartitioner = ArrayUtils.toPrimitive( |
| updatedDimensionPartitioner.toArray(new Integer[updatedDimensionPartitioner.size()])); |
| int[] dimensionBitLength = |
| CarbonUtil.getDimensionBitLength(latestColumnCardinality, latestColumnPartitioner); |
| restructuredKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength); |
| } |
| } |
| |
| /** |
| * This method will initialize the block key generator for the current block based on the |
| * dictionary columns present in the current block |
| */ |
| private void initCurrentBlockKeyGenerator() { |
| SegmentProperties segmentProperties = |
| tableBlockExecutionInfos.getDataBlock().getSegmentProperties(); |
| int[] dictionaryColumnBlockIndex = tableBlockExecutionInfos.getDictionaryColumnBlockIndex(); |
| int[] updatedColumnCardinality = new int[dictionaryColumnBlockIndex.length]; |
| int[] updatedDimensionPartitioner = new int[dictionaryColumnBlockIndex.length]; |
| for (int i = 0; i < dictionaryColumnBlockIndex.length; i++) { |
| // get the dictionary key ordinal as column cardinality in segment properties |
| // will only be for dictionary encoded columns |
| CarbonDimension currentBlockDimension = |
| segmentProperties.getDimensions().get(dictionaryColumnBlockIndex[i]); |
| updatedColumnCardinality[i] = |
| segmentProperties.getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]; |
| updatedDimensionPartitioner[i] = |
| segmentProperties.getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]; |
| } |
| if (dictionaryColumnBlockIndex.length > 0) { |
| int[] dimensionBitLength = |
| CarbonUtil.getDimensionBitLength(updatedColumnCardinality, updatedDimensionPartitioner); |
| updatedCurrentBlockKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength); |
| } |
| } |
| |
| /** |
| * This method will add a record both key and value to list object |
| * it will keep track of how many record is processed, to handle limit scenario |
| */ |
| @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { |
| List<Object[]> listBasedResult = new ArrayList<>(batchSize); |
| QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures(); |
| BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = |
| scannedResult.getDeleteDeltaDataCache(); |
| // scan the record and add to list |
| int rowCounter = 0; |
| while (scannedResult.hasNext() && rowCounter < batchSize) { |
| scanResultAndGetData(scannedResult); |
| if (null != deleteDeltaDataCache && deleteDeltaDataCache |
| .contains(scannedResult.getCurrentRowId())) { |
| continue; |
| } |
| // re-fill dictionary and no dictionary key arrays for the newly added columns |
| if (dimensionInfo.isDictionaryColumnAdded()) { |
| dictionaryKeyArray = fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray); |
| } |
| if (dimensionInfo.isNoDictionaryColumnAdded()) { |
| noDictionaryKeyArray = fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray); |
| } |
| prepareRow(scannedResult, listBasedResult, queryMeasures); |
| rowCounter++; |
| } |
| return listBasedResult; |
| } |
| |
| /** |
| * This method will fill the dictionary key array with newly added dictionary columns if any |
| * |
| * @param dictionaryKeyArray |
| * @return |
| */ |
| private byte[] fillDictionaryKeyArrayWithLatestSchema(byte[] dictionaryKeyArray) { |
| QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); |
| int newKeyArrayLength = dimensionInfo.getNewDictionaryColumnCount(); |
| long[] keyArray = null; |
| if (null != updatedCurrentBlockKeyGenerator) { |
| keyArray = updatedCurrentBlockKeyGenerator.getKeyArray(dictionaryKeyArray); |
| newKeyArrayLength += keyArray.length; |
| } |
| long[] keyArrayWithNewAddedColumns = new long[newKeyArrayLength]; |
| int existingColumnKeyArrayIndex = 0; |
| int newKeyArrayIndex = 0; |
| for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) { |
| if (CarbonUtil |
| .hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(), Encoding.DICTIONARY)) { |
| // if dimension exists then add the key array value else add the default value |
| if (dimensionInfo.getDimensionExists()[i]) { |
| keyArrayWithNewAddedColumns[newKeyArrayIndex++] = keyArray[existingColumnKeyArrayIndex++]; |
| } else { |
| long defaultValueAsLong; |
| Object defaultValue = dimensionInfo.getDefaultValues()[i]; |
| if (null != defaultValue) { |
| defaultValueAsLong = ((Integer) defaultValue).longValue(); |
| } else { |
| defaultValueAsLong = |
| new Integer(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY).longValue(); |
| } |
| keyArrayWithNewAddedColumns[newKeyArrayIndex++] = defaultValueAsLong; |
| } |
| } |
| } |
| try { |
| dictionaryKeyArray = restructuredKeyGenerator.generateKey(keyArrayWithNewAddedColumns); |
| } catch (KeyGenException e) { |
| LOGGER.error(e, e.getMessage()); |
| } |
| return dictionaryKeyArray; |
| } |
| |
| /** |
| * This method will fill the no dictionary byte array with newly added no dictionary columns |
| * |
| * @param noDictionaryKeyArray |
| * @return |
| */ |
| private byte[][] fillNoDictionaryKeyArrayWithLatestSchema(byte[][] noDictionaryKeyArray) { |
| QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); |
| byte[][] noDictionaryKeyArrayWithNewlyAddedColumns = |
| new byte[noDictionaryKeyArray.length + dimensionInfo.getNewNoDictionaryColumnCount()][]; |
| int existingColumnValueIndex = 0; |
| int newKeyArrayIndex = 0; |
| for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) { |
| if (!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) |
| && !actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) { |
| // if dimension exists then add the byte array value else add the default value |
| if (dimensionInfo.getDimensionExists()[i]) { |
| noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] = |
| noDictionaryKeyArray[existingColumnValueIndex++]; |
| } else { |
| byte[] newColumnDefaultValue = null; |
| Object defaultValue = dimensionInfo.getDefaultValues()[i]; |
| if (null != defaultValue) { |
| newColumnDefaultValue = ((UTF8String) defaultValue).getBytes(); |
| } else { |
| newColumnDefaultValue = |
| UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL).getBytes(); |
| } |
| noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] = newColumnDefaultValue; |
| } |
| } |
| } |
| return noDictionaryKeyArrayWithNewlyAddedColumns; |
| } |
| } |