blob: 0640af0381574075fed5997e1eef5debf9c8c389 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.query;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.TableProcessingOperations;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
import org.apache.carbondata.processing.store.CarbonFactHandler;
import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.log4j.Logger;
import org.apache.spark.sql.secondaryindex.exception.SecondaryIndexException;
import org.apache.spark.sql.secondaryindex.load.RowComparator;
import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil;
/**
* This class will process the query result and convert the data
* into a format compatible for data load
*/
public class SecondaryIndexQueryResultProcessor {
/**
* LOGGER
*/
private static final Logger LOGGER = LogServiceFactory.getLogService(
SecondaryIndexQueryResultProcessor.class.getName());
/**
* carbon load model that contains all the required information for load
*/
private CarbonLoadModel carbonLoadModel;
/**
* sortDataRows instance for sorting each row read ad writing to sort temp file
*/
private SortDataRows sortDataRows;
/**
* segment properties which contains required information for a segment
*/
private SegmentProperties segmentProperties;
/**
* segment information of parent table
*/
private SegmentProperties srcSegmentProperties;
/**
* final merger for merge sort
*/
private SingleThreadFinalSortFilesMerger finalMerger;
/**
* data handler VO object
*/
private CarbonFactHandler dataHandler;
/**
* column cardinality
*/
private int[] columnCardinality;
/**
* Fact Table To Index Table Column Mapping order
*/
private int[] factToIndexColumnMapping;
/**
* Fact Table Dict Column to Index Table Dict Column Mapping
*/
private int[] factToIndexDictColumnMapping;
/**
* boolean mapping for no dictionary columns in schema
*/
private boolean[] noDictionaryColMapping;
private boolean[] sortColumnMapping;
/**
* agg type defined for measures
*/
private DataType[] aggType;
/**
* segment id
*/
private String segmentId;
/**
* temp store location to be sued during data load
*/
private String[] tempStoreLocation;
/**
* data base name
*/
private String databaseName;
/**
* no dictionary column count in schema
*/
private int noDictionaryCount;
/**
* implicit column count in schema
*/
private int implicitColumnCount;
/**
* total count of measures in schema
*/
private int measureCount;
/**
* dimension count excluding complex dimension and no dictionary column count
*/
private int dimensionColumnCount;
/**
* complex dimension count in schema
*/
private int complexDimensionCount;
/**
* index table instance
*/
private CarbonTable indexTable;
/**
* whether the allocated tasks has any record
*/
private boolean isRecordFound;
/**
* boolean mapping for long string dimension
*/
private boolean[] isVarcharDimMapping;
private SortIntermediateFileMerger intermediateFileMerger;
private SortParameters sortParameters;
public SecondaryIndexQueryResultProcessor(CarbonLoadModel carbonLoadModel,
int[] columnCardinality, String segmentId, CarbonTable indexTable,
int[] factToIndexColumnMapping) {
this.carbonLoadModel = carbonLoadModel;
this.columnCardinality = columnCardinality;
this.segmentId = segmentId;
this.indexTable = indexTable;
this.databaseName = carbonLoadModel.getDatabaseName();
this.factToIndexColumnMapping = factToIndexColumnMapping;
initSegmentProperties();
}
/**
* This method will iterate over the query result and convert it into a format compatible
* for data loading
*/
public void processQueryResult(List<CarbonIterator<RowBatch>> detailQueryResultIteratorList)
throws SecondaryIndexException {
try {
initTempStoreLocation();
initSortDataRows();
initAggType();
processResult(detailQueryResultIteratorList);
// After delete command, if no records are fetched from one split,
// below steps are not required to be initialized.
if (isRecordFound) {
initializeFinalThreadMergerForMergeSort();
initDataHandler();
readAndLoadDataFromSortTempFiles();
}
} finally {
// clear temp files and folders created during secondary index creation
String databaseName = carbonLoadModel.getDatabaseName();
String tempLocationKey = CarbonDataProcessorUtil.getTempStoreLocationKey(databaseName,
indexTable.getTableName(), carbonLoadModel.getSegmentId(), carbonLoadModel.getTaskNo(),
false, false);
TableProcessingOperations.deleteLocalDataLoadFolderLocation(tempLocationKey,
indexTable.getTableName());
}
}
public void close() {
if (null != sortDataRows) {
sortDataRows.close();
}
if (null != finalMerger) {
finalMerger.close();
}
if (null != dataHandler) {
dataHandler.finish();
dataHandler.closeHandler();
}
}
/**
* This method will iterate over the query result and perform row sorting operation
*/
private void processResult(List<CarbonIterator<RowBatch>> detailQueryResultIteratorList)
throws SecondaryIndexException {
for (CarbonIterator<RowBatch> detailQueryIterator : detailQueryResultIteratorList) {
DetailQueryResultIterator queryIterator = (DetailQueryResultIterator) detailQueryIterator;
BlockExecutionInfo blockExecutionInfo = queryIterator.getBlockExecutionInfo();
// get complex dimension info map from block execution info
Map<Integer, GenericQueryType> complexDimensionInfoMap =
blockExecutionInfo.getComplexDimensionInfoMap();
int[] complexColumnParentBlockIndexes =
blockExecutionInfo.getComplexColumnParentBlockIndexes();
while (detailQueryIterator.hasNext()) {
RowBatch batchResult = detailQueryIterator.next();
while (batchResult.hasNext()) {
addRowForSorting(prepareRowObjectForSorting(batchResult.next(), complexDimensionInfoMap,
complexColumnParentBlockIndexes));
isRecordFound = true;
}
}
}
try {
sortDataRows.startSorting();
} catch (CarbonSortKeyAndGroupByException e) {
this.sortDataRows.close();
LOGGER.error(e);
throw new SecondaryIndexException(
"Problem loading data while creating secondary index: " + e.getMessage());
}
}
/**
* This method will prepare the data from raw object that will take part in sorting
*/
private Object[] prepareRowObjectForSorting(Object[] row,
Map<Integer, GenericQueryType> complexDimensionInfoMap, int[] complexColumnParentBlockIndexes)
throws SecondaryIndexException {
ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
byte[] implicitColumnByteArray = wrapper.getImplicitColumnByteArray();
List<CarbonDimension> dimensions = segmentProperties.getDimensions();
Object[] preparedRow = new Object[dimensions.size() + measureCount];
Map<Integer, Object[]> complexDataMap = new HashMap<>();
int noDictionaryIndex = 0;
int dictionaryIndex = 0;
int complexIndex = 0;
int i = 0;
// loop excluding last dimension as last one is implicit column.
for (; i < dimensions.size() - 1; i++) {
CarbonDimension dims = dimensions.get(i);
boolean isComplexColumn = false;
// As complex column of MainTable is stored as its primitive type in SI,
// we need to check if dimension is complex dimension or not based on isParentColumnComplex
// property.
if (dims.getColumnProperties() != null && Boolean
.parseBoolean(dims.getColumnProperties().get("isParentColumnComplex"))) {
isComplexColumn = true;
}
// fill all the no dictionary and dictionary data to the prepared row first, fill the complex
// flatten data to prepared row at last
if (dims.hasEncoding(Encoding.DICTIONARY) && !isComplexColumn) {
// dictionary
preparedRow[i] = wrapper.getDictionaryKeyByIndex(dictionaryIndex++);
} else {
if (isComplexColumn && complexColumnParentBlockIndexes.length == 0) {
// After restructure some complex column will not be present in parent block.
// In such case, set the SI implicit row value to null or empty byte array.
if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
// set null value for measures
preparedRow[i] = null;
} else {
preparedRow[i] = new byte[0];
}
} else if (isComplexColumn) {
// get the flattened data of complex column
byte[] complexKeyByIndex = wrapper.getComplexKeyByIndex(complexIndex);
ByteBuffer byteArrayInput = ByteBuffer.wrap(complexKeyByIndex);
GenericQueryType genericQueryType =
complexDimensionInfoMap.get(complexColumnParentBlockIndexes[complexIndex++]);
int complexDataLength = byteArrayInput.getShort(2);
// In case, if array is empty
if (complexDataLength == 0) {
complexDataLength = complexDataLength + 1;
}
// get flattened array data
Object[] complexFlattenedData = new Object[complexDataLength];
Object[] data = genericQueryType.getObjectArrayDataBasedOnDataType(byteArrayInput);
for (int index = 0; index < complexDataLength; index++) {
complexFlattenedData[index] =
getData(data, index, dims.getColumnSchema().getDataType());
}
// store the dimesnion column index and the complex column flattened data to a map
complexDataMap.put(i, complexFlattenedData);
} else {
// no dictionary dims
byte[] noDictionaryKeyByIndex = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
// no dictionary primitive columns are expected to be in original data while loading,
// so convert it to original data
if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
Object dataFromBytes = DataTypeUtil
.getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeyByIndex,
dims.getDataType());
if (null != dataFromBytes && dims.getDataType() == DataTypes.TIMESTAMP) {
dataFromBytes = (long) dataFromBytes / 1000L;
}
preparedRow[i] = dataFromBytes;
} else {
preparedRow[i] = noDictionaryKeyByIndex;
}
}
}
}
// at last add implicit column position reference(PID)
preparedRow[i] = implicitColumnByteArray;
// In case of complex array type, get the flattened data based on dimension index and add
// it to the prepared row one by one and add for sorting.
// TODO Handle for nested array and other complex types
if (!complexDataMap.isEmpty()) {
Object[] firstRow = preparedRow;
for (Map.Entry<Integer, Object[]> dataEntry : complexDataMap.entrySet()) {
Object[] complexArrayData = dataEntry.getValue();
preparedRow[dataEntry.getKey()] = complexArrayData[0];
firstRow = preparedRow.clone();
if (complexArrayData.length != 1) {
for (int index = 1; index < complexArrayData.length; index++) {
preparedRow[dataEntry.getKey()] = complexArrayData[index];
addRowForSorting(preparedRow.clone());
}
}
}
preparedRow = firstRow;
}
return preparedRow;
}
/**
* This method will return complex array primitive data
*/
private Object getData(Object[] data, int index, DataType dataType) {
if (data == null || data.length == 0) {
if (DataTypeUtil.isPrimitiveColumn(dataType)) {
return null;
}
return new byte[0];
} else if (data[0] == null) {
if (DataTypeUtil.isPrimitiveColumn(dataType)) {
return null;
}
return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
}
if (dataType == DataTypes.TIMESTAMP && null != data[index]) {
return (long) data[index] / 1000L;
} else if (dataType == DataTypes.DATE) {
return (int) data[index] + DateDirectDictionaryGenerator.cutOffDate;
}
return data[index];
}
/**
* This method will read sort temp files, perform merge sort and add it to store for data loading
*/
private void readAndLoadDataFromSortTempFiles() throws SecondaryIndexException {
Throwable throwable = null;
try {
Object[] previousRow = null;
// comparator for grouping the similar data, means every record
// should be unique in index table
RowComparator comparator = new RowComparator(noDictionaryColMapping,
SecondaryIndexUtil.getNoDictDataTypes(indexTable));
intermediateFileMerger.finish();
sortDataRows = null;
finalMerger.startFinalMerge();
while (finalMerger.hasNext()) {
Object[] rowRead = finalMerger.next();
if (null == previousRow) {
previousRow = rowRead;
} else {
int compareResult = comparator.compare(previousRow, rowRead);
if (0 == compareResult) {
// skip similar data rows
continue;
} else {
previousRow = rowRead;
}
}
CarbonRow row = new CarbonRow(rowRead);
dataHandler.addDataToStore(row);
}
dataHandler.finish();
} catch (CarbonDataWriterException e) {
LOGGER.error(e);
throw new SecondaryIndexException("Problem loading data while creating secondary index: ", e);
} catch (CarbonSortKeyAndGroupByException e) {
LOGGER.error(e);
throw new SecondaryIndexException(
"Problem in merging intermediate files while creating secondary index: ", e);
} catch (Throwable t) {
LOGGER.error(t);
throw new SecondaryIndexException("Problem while creating secondary index: ", t);
} finally {
if (null != dataHandler) {
try {
dataHandler.closeHandler();
} catch (CarbonDataWriterException e) {
LOGGER.error(e);
throwable = e;
}
}
}
if (null != throwable) {
throw new SecondaryIndexException(
"Problem closing data handler while creating secondary index: ", throwable);
}
dataHandler = null;
}
/**
* initialise segment properties
*/
private void initSegmentProperties() {
List<ColumnSchema> columnSchemaList = CarbonUtil.getColumnSchemaList(
indexTable.getVisibleDimensions(), indexTable.getVisibleMeasures());
segmentProperties = new SegmentProperties(columnSchemaList);
srcSegmentProperties = new SegmentProperties(getParentColumnOrder(columnSchemaList));
}
/**
* Convert index table column order into parent table column order
*/
private List<ColumnSchema> getParentColumnOrder(List<ColumnSchema> columnSchemaList) {
List<ColumnSchema> parentColumnList = new ArrayList<>(columnSchemaList.size());
for (int i = 0; i < columnSchemaList.size(); i++) {
// Extra cols are dummy_measure & positionId implicit column
if (i >= columnCardinality.length) {
parentColumnList.add(columnSchemaList.get(i));
} else {
parentColumnList.add(columnSchemaList.get(factToIndexColumnMapping[i]));
}
}
return parentColumnList;
}
/**
* add row to a temp array which will we written to a sort temp file after sorting
*/
private void addRowForSorting(Object[] row) throws SecondaryIndexException {
try {
// prepare row array using RemoveDictionaryUtil class
sortDataRows.addRow(row);
} catch (CarbonSortKeyAndGroupByException e) {
LOGGER.error(e);
this.sortDataRows.close();
throw new SecondaryIndexException(
"Row addition for sorting failed while creating secondary index: " + e.getMessage());
}
}
/**
* create an instance of sort data rows
*/
private void initSortDataRows() throws SecondaryIndexException {
measureCount = indexTable.getVisibleMeasures().size();
implicitColumnCount = indexTable.getImplicitDimensions().size();
List<CarbonDimension> dimensions = indexTable.getVisibleDimensions();
noDictionaryColMapping = new boolean[dimensions.size()];
sortColumnMapping = new boolean[dimensions.size()];
isVarcharDimMapping = new boolean[dimensions.size()];
int i = 0;
for (CarbonDimension dimension : dimensions) {
if (dimension.isSortColumn()) {
sortColumnMapping[i] = true;
}
if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
i++;
continue;
}
noDictionaryColMapping[i] = true;
if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) {
isVarcharDimMapping[i] = true;
}
i++;
noDictionaryCount++;
}
dimensionColumnCount = dimensions.size();
sortParameters = createSortParameters();
CarbonDataProcessorUtil.deleteSortLocationIfExists(sortParameters.getTempFileLocation());
CarbonDataProcessorUtil.createLocations(sortParameters.getTempFileLocation());
intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
this.sortDataRows = new SortDataRows(sortParameters, intermediateFileMerger);
this.sortDataRows.initialize();
}
/**
* This method will create the sort parameters VO object
*/
private SortParameters createSortParameters() {
int numberOfCompactingCores = CarbonProperties.getInstance().getNumberOfCompactingCores();
return SortParameters.createSortParameters(indexTable, databaseName, indexTable.getTableName(),
dimensionColumnCount, complexDimensionCount, measureCount, noDictionaryCount, segmentId,
carbonLoadModel.getTaskNo(), noDictionaryColMapping, sortColumnMapping, isVarcharDimMapping,
false, numberOfCompactingCores / 2);
}
/**
* create an instance of finalThread merger which will perform merge sort on all the
* sort temp files
*/
private void initializeFinalThreadMergerForMergeSort() {
String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
sortParameters.setNoDictionarySortColumn(
CarbonDataProcessorUtil.getNoDictSortColMapping(indexTable));
finalMerger = new SingleThreadFinalSortFilesMerger(sortTempFileLocation,
indexTable.getTableName(), sortParameters);
}
/**
* initialise carbon data writer instance
*/
private void initDataHandler() throws SecondaryIndexException {
String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(this.indexTable,
segmentId);
CarbonFactDataHandlerModel carbonFactDataHandlerModel =
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(carbonLoadModel, indexTable,
segmentProperties, indexTable.getTableName(), tempStoreLocation, carbonStoreLocation);
carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(indexTable.getTableLastUpdatedTime());
CarbonDataFileAttributes carbonDataFileAttributes = new CarbonDataFileAttributes(
Integer.parseInt(carbonLoadModel.getTaskNo()), carbonLoadModel.getFactTimeStamp());
carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel);
try {
dataHandler.initialise();
} catch (CarbonDataWriterException e) {
this.sortDataRows.close();
LOGGER.error(e);
throw new SecondaryIndexException(
"Problem initialising data handler while creating secondary index: " + e.getMessage());
}
}
/**
* initialise temporary store location
*/
private void initTempStoreLocation() {
tempStoreLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(indexTable,
carbonLoadModel.getTaskNo(), segmentId, false, false);
}
/**
* initialise aggregation type for measures for their storage format
*/
private void initAggType() {
aggType = CarbonDataProcessorUtil.initDataType(indexTable, indexTable.getTableName(),
measureCount);
}
}