blob: d9a84fe9f3f1f3ffd7cca1cc7f7c1e26c568ab45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.merger;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
import org.apache.carbondata.processing.store.CarbonFactHandler;
import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.log4j.Logger;
/**
* This class will process the query result and convert the data
* into a format compatible for data load
*/
public class CompactionResultSortProcessor extends AbstractResultProcessor {
/**
* LOGGER
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(CompactionResultSortProcessor.class.getName());
/**
* carbon load model that contains all the required information for load
*/
private CarbonLoadModel carbonLoadModel;
/**
* carbon table
*/
private CarbonTable carbonTable;
/**
* sortDataRows instance for sorting each row read ad writing to sort temp file
*/
private SortDataRows sortDataRows;
/**
* final merger for merge sort
*/
private SingleThreadFinalSortFilesMerger finalMerger;
/**
* data handler VO object
*/
private CarbonFactHandler dataHandler;
/**
* segment properties for getting dimension cardinality and other required information of a block
*/
private SegmentProperties segmentProperties;
/**
* compaction type to decide whether taskID need to be extracted from carbondata files
*/
private CompactionType compactionType;
/**
* boolean mapping for no dictionary columns in schema
*/
private boolean[] noDictionaryColMapping;
private boolean[] sortColumnMapping;
/**
* boolean mapping for long string dimension
*/
private boolean[] isVarcharDimMapping;
/**
* agg type defined for measures
*/
private DataType[] dataTypes;
/**
* segment id
*/
private String segmentId;
/**
* temp store location to be sued during data load
*/
private String[] tempStoreLocation;
/**
* table name
*/
private String tableName;
/**
* no dictionary column count in schema
*/
private int noDictionaryCount;
/**
* total count of measures in schema
*/
private int measureCount;
/**
* dimension count excluding complex dimension and no dictionary column count
*/
private int dimensionColumnCount;
/**
* all allDimensions in the table
*/
private List<CarbonDimension> dimensions;
/**
* whether the allocated tasks has any record
*/
private boolean isRecordFound;
/**
* intermediate sort merger
*/
private SortIntermediateFileMerger intermediateFileMerger;
private PartitionSpec partitionSpec;
private SortParameters sortParameters;
private CarbonColumn[] noDicAndComplexColumns;
public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable,
SegmentProperties segmentProperties, CompactionType compactionType, String tableName,
PartitionSpec partitionSpec) {
this.carbonLoadModel = carbonLoadModel;
this.carbonTable = carbonTable;
this.segmentProperties = segmentProperties;
this.segmentId = carbonLoadModel.getSegmentId();
this.compactionType = compactionType;
this.tableName = tableName;
this.partitionSpec = partitionSpec;
}
/**
* This method will iterate over the query result and convert it into a format compatible
* for data loading
*
* @param unsortedResultIteratorList
* @param sortedResultIteratorList
* @return if the compaction is success or not
* @throws Exception
*/
public boolean execute(List<RawResultIterator> unsortedResultIteratorList,
List<RawResultIterator> sortedResultIteratorList) throws Exception {
boolean isCompactionSuccess = false;
try {
initTempStoreLocation();
initSortDataRows();
dataTypes = CarbonDataProcessorUtil.initDataType(carbonTable, tableName, measureCount);
processResult(unsortedResultIteratorList);
// After delete command, if no records are fetched from one split,
// below steps are not required to be initialized.
if (isRecordFound) {
initializeFinalThreadMergerForMergeSort();
initDataHandler();
readAndLoadDataFromSortTempFiles(sortedResultIteratorList);
}
isCompactionSuccess = true;
} catch (Exception e) {
LOGGER.error(e.getLocalizedMessage(), e);
throw e;
} finally {
if (partitionSpec != null) {
try {
SegmentFileStore
.writeSegmentFile(carbonLoadModel.getTablePath(), carbonLoadModel.getTaskNo(),
partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "",
partitionSpec.getPartitions());
} catch (IOException e) {
throw e;
}
}
// clear temp files and folders created during compaction
deleteTempStoreLocation();
}
return isCompactionSuccess;
}
@Override
public void close() {
// close the sorter executor service
if (null != sortDataRows) {
sortDataRows.close();
}
// close the final merger
if (null != finalMerger) {
finalMerger.close();
}
// close data handler
if (null != dataHandler) {
dataHandler.closeHandler();
}
}
/**
* This method will clean up the local folders and files created during compaction process
*/
private void deleteTempStoreLocation() {
if (null != tempStoreLocation) {
for (String tempLoc : tempStoreLocation) {
try {
CarbonUtil.deleteFoldersAndFiles(new File(tempLoc));
} catch (IOException | InterruptedException e) {
LOGGER.error("Problem deleting local folders during compaction: " + e.getMessage(), e);
}
}
}
}
/**
* This method will iterate over the query result and perform row sorting operation
*
* @param resultIteratorList
*/
private void processResult(List<RawResultIterator> resultIteratorList) throws Exception {
for (RawResultIterator resultIterator : resultIteratorList) {
if (CompactionType.STREAMING == compactionType) {
while (resultIterator.hasNext()) {
// the input iterator of streaming segment is already using raw row
addRowForSorting(prepareStreamingRowObjectForSorting(resultIterator.next()));
isRecordFound = true;
}
} else {
while (resultIterator.hasNext()) {
addRowForSorting(prepareRowObjectForSorting(resultIterator.next()));
isRecordFound = true;
}
}
resultIterator.close();
}
try {
sortDataRows.startSorting();
} catch (CarbonSortKeyAndGroupByException e) {
LOGGER.error(e.getMessage(), e);
throw new Exception("Problem loading data during compaction: " + e.getMessage(), e);
}
}
/**
* This method will prepare the data from raw object that will take part in sorting
*
* @param row
* @return
*/
private Object[] prepareStreamingRowObjectForSorting(Object[] row) {
Object[] preparedRow = new Object[dimensions.size() + measureCount];
for (int i = 0; i < dimensions.size(); i++) {
CarbonDimension dims = dimensions.get(i);
if (dims.hasEncoding(Encoding.DICTIONARY)) {
// dictionary
preparedRow[i] = row[i];
} else {
// no dictionary dims
if (DataTypeUtil.isPrimitiveColumn(dims.getDataType()) && !dims.isComplex()) {
// no dictionary measure columns are expected as original data
preparedRow[i] = DataTypeUtil
.getDataBasedOnDataTypeForNoDictionaryColumn((byte[]) row[i], dims.getDataType());
// for timestamp the above method will give the original data, so it should be
// converted again to the format to be loaded (without micros)
if (null != preparedRow[i] && dims.getDataType() == DataTypes.TIMESTAMP) {
preparedRow[i] = (long) preparedRow[i] / 1000L;
}
} else {
preparedRow[i] = row[i];
}
}
}
// fill all the measures
for (int i = 0; i < measureCount; i++) {
preparedRow[dimensionColumnCount + i] = row[dimensionColumnCount + i];
}
return preparedRow;
}
/**
* This method will prepare the data from raw object that will take part in sorting
*
* @param row
* @return
*/
private Object[] prepareRowObjectForSorting(Object[] row) {
ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
// ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount];
Object[] preparedRow = new Object[dimensions.size() + measureCount];
// convert the dictionary from MDKey to surrogate key
byte[] dictionaryKey = wrapper.getDictionaryKey();
long[] keyArray = segmentProperties.getDimensionKeyGenerator().getKeyArray(dictionaryKey);
Object[] dictionaryValues = new Object[dimensionColumnCount + measureCount];
for (int i = 0; i < keyArray.length; i++) {
dictionaryValues[i] = Long.valueOf(keyArray[i]).intValue();
}
int noDictionaryIndex = 0;
int dictionaryIndex = 0;
int complexIndex = 0;
for (int i = 0; i < dimensions.size(); i++) {
CarbonDimension dims = dimensions.get(i);
if (dims.hasEncoding(Encoding.DICTIONARY) && !dims.isComplex()) {
// dictionary
preparedRow[i] = dictionaryValues[dictionaryIndex++];
} else if (!dims.isComplex()) {
// no dictionary dims
byte[] noDictionaryKeyByIndex = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
// no dictionary measure columns are expected as original data
preparedRow[i] = DataTypeUtil
.getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeyByIndex,
dims.getDataType());
// for timestamp the above method will give the original data, so it should be
// converted again to the format to be loaded (without micros)
if (null != preparedRow[i] && dims.getDataType() == DataTypes.TIMESTAMP) {
preparedRow[i] = (long) preparedRow[i] / 1000L;
}
} else {
preparedRow[i] = noDictionaryKeyByIndex;
}
} else {
preparedRow[i] = wrapper.getComplexKeyByIndex(complexIndex++);
}
}
// fill all the measures
// measures will always start from 1st index in the row object array
int measureIndexInRow = 1;
for (int i = 0; i < measureCount; i++) {
preparedRow[dimensionColumnCount + i] =
getConvertedMeasureValue(row[measureIndexInRow++], dataTypes[i]);
}
return preparedRow;
}
/**
* This method will convert the spark decimal to java big decimal type
*
* @param value
* @param type
* @return
*/
private Object getConvertedMeasureValue(Object value, DataType type) {
if (DataTypes.isDecimal(type)) {
if (value != null) {
value = DataTypeUtil.getDataTypeConverter().convertFromDecimalToBigDecimal(value);
}
return value;
} else {
return value;
}
}
/**
* This method will read sort temp files, perform merge sort and add it to store for data loading
*/
private void readAndLoadDataFromSortTempFiles(List<RawResultIterator> sortedRawResultIterator)
throws Exception {
try {
intermediateFileMerger.finish();
finalMerger.startFinalMerge();
if (sortedRawResultIterator != null && sortedRawResultIterator.size() > 0) {
finalMerger.addInMemoryRawResultIterator(sortedRawResultIterator, segmentProperties,
noDicAndComplexColumns, dataTypes);
}
while (finalMerger.hasNext()) {
Object[] row = finalMerger.next();
dataHandler.addDataToStore(new CarbonRow(row));
}
dataHandler.finish();
} catch (CarbonDataWriterException e) {
LOGGER.error(e.getMessage(), e);
throw new Exception("Problem loading data during compaction.", e);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw new Exception("Problem loading data during compaction.", e);
} finally {
if (null != dataHandler) {
try {
dataHandler.closeHandler();
} catch (CarbonDataWriterException e) {
LOGGER.error("Error in close data handler", e);
throw new Exception("Error in close data handler", e);
}
}
}
}
/**
* add row to a temp array which will we written to a sort temp file after sorting
*
* @param row
*/
private void addRowForSorting(Object[] row) throws Exception {
try {
sortDataRows.addRow(row);
} catch (CarbonSortKeyAndGroupByException e) {
LOGGER.error(e.getMessage(), e);
throw new Exception("Row addition for sorting failed during compaction: "
+ e.getMessage(), e);
}
}
/**
* create an instance of sort data rows
*/
private void initSortDataRows() throws Exception {
measureCount = carbonTable.getVisibleMeasures().size();
dimensions = new ArrayList<>(2);
dimensions.addAll(segmentProperties.getDimensions());
dimensions.addAll(segmentProperties.getComplexDimensions());
noDictionaryColMapping = new boolean[dimensions.size()];
sortColumnMapping = new boolean[dimensions.size()];
isVarcharDimMapping = new boolean[dimensions.size()];
int i = 0;
for (CarbonDimension dimension : dimensions) {
if (dimension.isSortColumn()) {
sortColumnMapping[i] = true;
}
if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
i++;
continue;
}
noDictionaryColMapping[i] = true;
if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) {
isVarcharDimMapping[i] = true;
}
i++;
noDictionaryCount++;
}
dimensionColumnCount = dimensions.size();
sortParameters = createSortParameters();
intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
// TODO: Now it is only supported onheap merge, but we can have unsafe merge
// as well by using UnsafeSortDataRows.
this.sortDataRows = new SortDataRows(sortParameters, intermediateFileMerger);
try {
this.sortDataRows.initialize();
} catch (CarbonSortKeyAndGroupByException e) {
LOGGER.error(e.getMessage(), e);
throw new Exception(
"Error initializing sort data rows object during compaction: " + e.getMessage(), e);
}
}
/**
* This method will create the sort parameters VO object
*
* @return
*/
private SortParameters createSortParameters() {
int numberOfCompactingCores = CarbonProperties.getInstance().getNumberOfCompactingCores();
return SortParameters
.createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
noDictionaryCount, segmentId, carbonLoadModel.getTaskNo(), noDictionaryColMapping,
sortColumnMapping, isVarcharDimMapping, true, numberOfCompactingCores / 2);
}
/**
* create an instance of finalThread merger which will perform merge sort on all the
* sort temp files
*/
private void initializeFinalThreadMergerForMergeSort() {
boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
.getNoDictSortColMapping(carbonTable);
sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
finalMerger =
new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, sortParameters);
}
/**
* initialise carbon data writer instance
*/
private void initDataHandler() throws Exception {
String carbonStoreLocation;
if (partitionSpec != null) {
carbonStoreLocation =
partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR
+ carbonLoadModel.getFactTimeStamp() + ".tmp";
} else {
carbonStoreLocation = CarbonDataProcessorUtil
.createCarbonStoreLocation(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
carbonLoadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
tempStoreLocation, carbonStoreLocation);
carbonFactDataHandlerModel.setSegmentId(carbonLoadModel.getSegmentId());
setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel);
try {
dataHandler.initialise();
} catch (CarbonDataWriterException e) {
LOGGER.error(e.getMessage(), e);
throw new Exception("Problem initialising data handler during compaction: "
+ e.getMessage(), e);
}
}
/**
* initialise temporary store location
*/
private void initTempStoreLocation() {
tempStoreLocation = CarbonDataProcessorUtil
.getLocalDataFolderLocation(carbonTable, carbonLoadModel.getTaskNo(),
segmentId, true, false);
}
}