blob: 474a54ed77fcb5f9a4bfb1e4df7498c5ce429686 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.util;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.constants.LoggerAction;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.SortScopeOptions;
import org.apache.carbondata.core.metadata.DatabaseLocationProvider;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
public final class CarbonDataProcessorUtil {
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonDataProcessorUtil.class.getName());
private CarbonDataProcessorUtil() {
}
/**
* This method will be used to delete sort temp location is it is exites
*/
public static void deleteSortLocationIfExists(String[] locations) {
for (String loc : locations) {
File file = new File(loc);
if (file.exists()) {
try {
CarbonUtil.deleteFoldersAndFiles(file);
} catch (IOException | InterruptedException e) {
LOGGER.error("Failed to delete " + loc, e);
}
}
}
}
/**
* This method will be used to create dirs
* @param locations locations to create
*/
public static void createLocations(String[] locations) {
for (String loc : locations) {
File dir = new File(loc);
if (dir.exists()) {
LOGGER.warn("dir already exists, skip dir creation: " + loc);
} else {
if (!dir.mkdirs() && !dir.exists()) {
// concurrent scenario mkdir may fail, so checking dir
LOGGER.error("Error occurs while creating dir: " + loc);
} else {
LOGGER.info("Successfully created dir: " + loc);
}
}
}
}
/**
*
* This method will form the local data folder store location
*
* @param carbonTable
* @param taskId
* @param segmentId
* @param isCompactionFlow
* @param isAltPartitionFlow
* @return
*/
public static String[] getLocalDataFolderLocation(CarbonTable carbonTable,
String taskId, String segmentId, boolean isCompactionFlow, boolean isAltPartitionFlow) {
String tempLocationKey =
getTempStoreLocationKey(carbonTable.getDatabaseName(), carbonTable.getTableName(),
segmentId, taskId, isCompactionFlow, isAltPartitionFlow);
String baseTempStorePath = CarbonProperties.getInstance()
.getProperty(tempLocationKey);
if (baseTempStorePath == null) {
LOGGER.warn("Location not set for the key " + tempLocationKey
+ ". This will occur during a global-sort loading,"
+ " in this case local dirs will be chosen by spark");
baseTempStorePath = "./store.location";
}
String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, File.pathSeparator);
String[] localDataFolderLocArray = new String[baseTmpStorePathArray.length];
for (int i = 0; i < baseTmpStorePathArray.length; i++) {
String tmpStore = baseTmpStorePathArray[i];
String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(tmpStore, segmentId);
localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId;
}
return localDataFolderLocArray;
}
/**
* This method will form the key for getting the temporary location set in carbon properties
*
* @param databaseName
* @param tableName
* @param segmentId
* @param taskId
* @param isCompactionFlow
* @return
*/
public static String getTempStoreLocationKey(String databaseName, String tableName,
String segmentId, String taskId, boolean isCompactionFlow, boolean isAltPartitionFlow) {
String tempLocationKey = DatabaseLocationProvider.get().provide(databaseName)
+ CarbonCommonConstants.UNDERSCORE + tableName
+ CarbonCommonConstants.UNDERSCORE + segmentId + CarbonCommonConstants.UNDERSCORE + taskId;
if (isCompactionFlow) {
tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + CarbonCommonConstants.UNDERSCORE
+ tempLocationKey;
}
if (isAltPartitionFlow) {
tempLocationKey = CarbonCommonConstants.ALTER_PARTITION_KEY_WORD +
CarbonCommonConstants.UNDERSCORE + tempLocationKey;
}
return tempLocationKey;
}
/**
* Preparing the boolean [] to map whether the dimension is no Dictionary or not.
*/
public static boolean[] getNoDictionaryMapping(DataField[] fields) {
List<Boolean> noDictionaryMapping = new ArrayList<Boolean>();
for (DataField field : fields) {
// for complex type need to break the loop
if (field.getColumn().isComplex()) {
break;
}
if (!field.isDateDataType() && field.getColumn().isDimension()) {
noDictionaryMapping.add(true);
} else if (field.getColumn().isDimension()) {
noDictionaryMapping.add(false);
}
}
return ArrayUtils
.toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
}
/**
* Preparing the boolean [] to map whether the dimension is varchar data type or not.
*/
public static boolean[] getIsVarcharColumnMapping(DataField[] fields) {
List<Boolean> isVarcharColumnMapping = new ArrayList<Boolean>();
for (DataField field : fields) {
// for complex type need to break the loop
if (field.getColumn().isComplex()) {
break;
}
if (field.getColumn().isDimension()) {
isVarcharColumnMapping.add(
field.getColumn().getColumnSchema().getDataType() == DataTypes.VARCHAR);
}
}
return ArrayUtils.toPrimitive(
isVarcharColumnMapping.toArray(new Boolean[isVarcharColumnMapping.size()]));
}
public static boolean[] getNoDictionaryMapping(CarbonColumn[] carbonColumns) {
List<Boolean> noDictionaryMapping = new ArrayList<Boolean>();
for (CarbonColumn column : carbonColumns) {
// for complex type need to break the loop
if (column.isComplex()) {
break;
}
if (column.getDataType() != DataTypes.DATE && column.isDimension()) {
noDictionaryMapping.add(true);
} else if (column.isDimension()) {
noDictionaryMapping.add(false);
}
}
return ArrayUtils
.toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
}
private static String getComplexTypeString(DataField[] dataFields) {
StringBuilder dimString = new StringBuilder();
for (DataField dataField : dataFields) {
if (dataField.getColumn().getDataType().isComplexType()) {
addAllComplexTypeChildren((CarbonDimension) dataField.getColumn(), dimString, "");
dimString.append(CarbonCommonConstants.SEMICOLON_SPC_CHARACTER);
}
}
return dimString.toString();
}
private static String isDictionaryType(CarbonDimension dimension) {
boolean isDictionary = true;
if (dimension.getDataType() != DataTypes.DATE) {
isDictionary = false;
}
return String.valueOf(isDictionary);
}
/**
* This method will return all the child dimensions under complex dimension
*/
private static void addAllComplexTypeChildren(CarbonDimension dimension, StringBuilder dimString,
String parent) {
dimString.append(dimension.getColName()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(dimension.getDataType()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(parent).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(isDictionaryType(dimension)).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(dimension.getColumnId()).append(CarbonCommonConstants.HASH_SPC_CHARACTER);
for (int i = 0; i < dimension.getNumberOfChild(); i++) {
CarbonDimension childDim = dimension.getListOfChildDimensions().get(i);
if (childDim.getNumberOfChild() > 0) {
addAllComplexTypeChildren(childDim, dimString, dimension.getColName());
} else {
dimString.append(childDim.getColName()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(childDim.getDataType()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(dimension.getColName()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(isDictionaryType(dimension)).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(childDim.getColumnId()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(childDim.getOrdinal()).append(CarbonCommonConstants.HASH_SPC_CHARACTER);
}
}
}
// TODO: need to simplify it. Not required create string first.
public static Map<String, GenericDataType> getComplexTypesMap(DataField[] dataFields,
String nullFormat) {
String complexTypeString = getComplexTypeString(dataFields);
if (null == complexTypeString || complexTypeString.equals("")) {
return new LinkedHashMap<>();
}
Map<String, GenericDataType> complexTypesMap = new LinkedHashMap<String, GenericDataType>();
String[] hierarchies = complexTypeString.split(CarbonCommonConstants.SEMICOLON_SPC_CHARACTER);
for (int i = 0; i < hierarchies.length; i++) {
String[] levels = hierarchies[i].split(CarbonCommonConstants.HASH_SPC_CHARACTER);
String[] levelInfo = levels[0].split(CarbonCommonConstants.COLON_SPC_CHARACTER);
String level1Info = levelInfo[1].toLowerCase();
GenericDataType g = (level1Info.contains(CarbonCommonConstants.ARRAY) || level1Info
.contains(CarbonCommonConstants.MAP)) ?
new ArrayDataType(levelInfo[0], "", levelInfo[3]) :
new StructDataType(levelInfo[0], "", levelInfo[3]);
complexTypesMap.put(levelInfo[0], g);
for (int j = 1; j < levels.length; j++) {
levelInfo = levels[j].split(CarbonCommonConstants.COLON_SPC_CHARACTER);
String levelInfo1 = levelInfo[1].toLowerCase();
if (levelInfo1.contains(CarbonCommonConstants.ARRAY) || levelInfo1
.contains(CarbonCommonConstants.MAP)) {
g.addChildren(new ArrayDataType(levelInfo[0], levelInfo[2], levelInfo[3]));
} else if (levelInfo[1].toLowerCase().contains(CarbonCommonConstants.STRUCT)) {
g.addChildren(new StructDataType(levelInfo[0], levelInfo[2], levelInfo[3]));
} else {
g.addChildren(
new PrimitiveDataType(levelInfo[0], DataTypeUtil.valueOf(levelInfo[1]),
levelInfo[2], levelInfo[4], levelInfo[3].contains("true"), nullFormat
));
}
}
}
return complexTypesMap;
}
public static boolean isHeaderValid(String tableName, String[] csvHeader,
CarbonDataLoadSchema schema, List<String> ignoreColumns) {
Iterator<String> columnIterator =
CarbonDataProcessorUtil.getSchemaColumnNames(schema).iterator();
Set<String> csvColumns = new HashSet<String>(csvHeader.length);
Collections.addAll(csvColumns, csvHeader);
// file header should contain all columns of carbon table.
// So csvColumns should contain all elements of columnIterator.
while (columnIterator.hasNext()) {
String column = columnIterator.next().toLowerCase();
if (!csvColumns.contains(column) && !ignoreColumns.contains(column)) {
return false;
}
}
return true;
}
/**
* This method update the column Name
*
* @param schema
*/
public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema) {
Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonDimension> dimensions =
schema.getCarbonTable().getVisibleDimensions();
Map<String, String> properties =
schema.getCarbonTable().getTableInfo().getFactTable().getTableProperties();
String spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
for (CarbonDimension dimension : dimensions) {
if (spatialProperty != null && !dimension.getColName()
.equalsIgnoreCase(spatialProperty.trim())) {
// skip the non-schema column
columnNames.add(dimension.getColName());
}
}
List<CarbonMeasure> measures = schema.getCarbonTable().getVisibleMeasures();
for (CarbonMeasure msr : measures) {
columnNames.add(msr.getColName());
}
return columnNames;
}
public static DataType[] getMeasureDataType(int measureCount, CarbonTable carbonTable) {
DataType[] type = new DataType[measureCount];
for (int i = 0; i < type.length; i++) {
type[i] = DataTypes.DOUBLE;
}
List<CarbonMeasure> measures = carbonTable.getVisibleMeasures();
for (int i = 0; i < type.length; i++) {
type[i] = measures.get(i).getDataType();
}
return type;
}
/**
* Get the no dictionary data types on the table
*
* @param carbonTable
* @return
*/
public static DataType[] getNoDictDataTypes(CarbonTable carbonTable) {
List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
List<DataType> type = new ArrayList<>();
for (int i = 0; i < dimensions.size(); i++) {
if (dimensions.get(i).isSortColumn() && dimensions.get(i).getDataType() != DataTypes.DATE) {
type.add(dimensions.get(i).getDataType());
}
}
return type.toArray(new DataType[type.size()]);
}
/**
* get visible no dictionary dimensions as per data field order
*
* @param dataFields
* @return
*/
public static DataType[] getNoDictDataTypesAsDataFieldOrder(DataField[] dataFields) {
List<DataType> type = new ArrayList<>();
for (DataField dataField : dataFields) {
if (!dataField.getColumn().isInvisible() && dataField.getColumn().isDimension()) {
if (dataField.getColumn().getColumnSchema().isSortColumn()
&& dataField.getColumn().getColumnSchema().getDataType() != DataTypes.DATE) {
type.add(dataField.getColumn().getColumnSchema().getDataType());
}
}
}
return type.toArray(new DataType[type.size()]);
}
/**
* Get the no dictionary sort column mapping of the table
*
* @param carbonTable
* @return
*/
public static boolean[] getNoDictSortColMapping(CarbonTable carbonTable) {
List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
List<Boolean> noDicSortColMap = new ArrayList<>();
for (int i = 0; i < dimensions.size(); i++) {
if (dimensions.get(i).isSortColumn()) {
if (dimensions.get(i).getDataType() != DataTypes.DATE) {
noDicSortColMap.add(true);
} else {
noDicSortColMap.add(false);
}
}
}
Boolean[] mapping = noDicSortColMap.toArray(new Boolean[0]);
boolean[] noDicSortColMapping = new boolean[mapping.length];
for (int i = 0; i < mapping.length; i++) {
noDicSortColMapping[i] = mapping[i];
}
return noDicSortColMapping;
}
/**
* get mapping based on data fields order
*
* @param dataFields
* @return
*/
public static boolean[] getNoDictSortColMappingAsDataFieldOrder(DataField[] dataFields) {
List<Boolean> noDicSortColMap = new ArrayList<>();
for (DataField dataField : dataFields) {
if (!dataField.getColumn().isInvisible() && dataField.getColumn().isDimension()) {
if (dataField.getColumn().getColumnSchema().isSortColumn()) {
if (dataField.getColumn().getColumnSchema().getDataType() != DataTypes.DATE) {
noDicSortColMap.add(true);
} else {
noDicSortColMap.add(false);
}
}
}
}
Boolean[] mapping = noDicSortColMap.toArray(new Boolean[0]);
boolean[] noDicSortColMapping = new boolean[mapping.length];
for (int i = 0; i < mapping.length; i++) {
noDicSortColMapping[i] = mapping[i];
}
return noDicSortColMapping;
}
/**
* If the dimension is added in older version 1.1, by default it will be sort column, So during
* initial sorting, carbonrow will be in order where added sort column is at the beginning, But
* before final merger of sort, the data should be in schema order
* (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the carbonRow in schema
* order), so This method helps to find the index of no dictionary sort column in the carbonrow
* data.
*/
public static int[] getColumnIdxBasedOnSchemaInRow(CarbonTable carbonTable) {
List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
List<Integer> noDicSortColMap = new ArrayList<>();
int counter = 0;
for (CarbonDimension dimension : dimensions) {
if (dimension.getDataType() == DataTypes.DATE) {
continue;
}
if (dimension.isSortColumn() && DataTypeUtil.isPrimitiveColumn(dimension.getDataType())) {
noDicSortColMap.add(counter);
}
counter++;
}
Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
for (int i = 0; i < mapping.length; i++) {
columnIdxBasedOnSchemaInRow[i] = mapping[i];
}
return columnIdxBasedOnSchemaInRow;
}
/**
* If the dimension is added in older version 1.1, by default it will be sort column, So during
* initial sorting, carbonrow will be in order where added sort column is at the beginning, But
* before final merger of sort, the data should be in schema order
* (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the carbonRow in schema
* order), so This method helps to find the index of no dictionary sort column in the carbonrow
* data.
*/
public static int[] getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(DataField[] dataFields) {
List<Integer> noDicSortColMap = new ArrayList<>();
int counter = 0;
for (DataField dataField : dataFields) {
if (!dataField.getColumn().isInvisible() && dataField.getColumn().isDimension()) {
if (dataField.getColumn().getColumnSchema().getDataType() == DataTypes.DATE) {
continue;
}
if (dataField.getColumn().getColumnSchema().isSortColumn() && DataTypeUtil
.isPrimitiveColumn(dataField.getColumn().getColumnSchema().getDataType())) {
noDicSortColMap.add(counter);
}
counter++;
}
}
Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
for (int i = 0; i < mapping.length; i++) {
columnIdxBasedOnSchemaInRow[i] = mapping[i];
}
return columnIdxBasedOnSchemaInRow;
}
/**
* Get the data types of the no dictionary sort columns
*
* @param carbonTable
* @return
*/
public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(CarbonTable carbonTable) {
List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
List<DataType> noDictSortType = new ArrayList<>();
List<DataType> noDictNoSortType = new ArrayList<>();
for (int i = 0; i < dimensions.size(); i++) {
if (dimensions.get(i).getDataType() != DataTypes.DATE) {
if (dimensions.get(i).isSortColumn()) {
noDictSortType.add(dimensions.get(i).getDataType());
} else {
noDictNoSortType.add(dimensions.get(i).getDataType());
}
}
}
DataType[] noDictSortTypes = noDictSortType.toArray(new DataType[noDictSortType.size()]);
DataType[] noDictNoSortTypes = noDictNoSortType.toArray(new DataType[noDictNoSortType.size()]);
Map<String, DataType[]> noDictSortAndNoSortTypes = new HashMap<>(2);
noDictSortAndNoSortTypes.put("noDictSortDataTypes", noDictSortTypes);
noDictSortAndNoSortTypes.put("noDictNoSortDataTypes", noDictNoSortTypes);
return noDictSortAndNoSortTypes;
}
/**
* Get the data types of the no dictionary sort columns as per dataFields order
*
* @param dataFields
* @return
*/
public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypesAsDataFieldOrder(
DataField[] dataFields) {
List<DataType> noDictSortType = new ArrayList<>();
List<DataType> noDictNoSortType = new ArrayList<>();
for (DataField dataField : dataFields) {
if (dataField.getColumn().isDimension()
&& dataField.getColumn().getColumnSchema().getDataType() != DataTypes.DATE) {
if (dataField.getColumn().getColumnSchema().isSortColumn()) {
noDictSortType.add(dataField.getColumn().getColumnSchema().getDataType());
} else {
noDictNoSortType.add(dataField.getColumn().getColumnSchema().getDataType());
}
}
}
DataType[] noDictSortTypes = noDictSortType.toArray(new DataType[noDictSortType.size()]);
DataType[] noDictNoSortTypes = noDictNoSortType.toArray(new DataType[noDictNoSortType.size()]);
Map<String, DataType[]> noDictSortAndNoSortTypes = new HashMap<>(2);
noDictSortAndNoSortTypes.put("noDictSortDataTypes", noDictSortTypes);
noDictSortAndNoSortTypes.put("noDictNoSortDataTypes", noDictNoSortTypes);
return noDictSortAndNoSortTypes;
}
/**
* This method will get the store location for the given path, segment id and partition id
*
* @return data directory path
*/
public static String createCarbonStoreLocation(CarbonTable carbonTable, String segmentId) {
return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
}
/**
* initialise data type for measures for their storage format
*/
public static DataType[] initDataType(CarbonTable carbonTable, String tableName,
int measureCount) {
DataType[] type = new DataType[measureCount];
for (int i = 0; i < type.length; i++) {
type[i] = DataTypes.DOUBLE;
}
List<CarbonMeasure> measures = carbonTable.getVisibleMeasures();
for (int i = 0; i < measureCount; i++) {
type[i] = measures.get(i).getDataType();
}
return type;
}
/**
* Check whether batch sort is enabled or not.
* @param configuration
* @return
*/
public static SortScopeOptions.SortScope getSortScope(CarbonDataLoadConfiguration configuration) {
SortScopeOptions.SortScope sortScope;
try {
// first check whether user input it from ddl, otherwise get from carbon properties
if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) == null) {
sortScope = SortScopeOptions.getSortScope(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT));
} else {
sortScope = SortScopeOptions.getSortScope(
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)
.toString());
}
} catch (Exception e) {
sortScope = SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
LOGGER.warn("Exception occured while resolving sort scope. " +
"sort scope is set to " + sortScope);
}
return sortScope;
}
public static SortScopeOptions.SortScope getSortScope(String sortScopeString) {
SortScopeOptions.SortScope sortScope;
try {
// first check whether user input it from ddl, otherwise get from carbon properties
if (sortScopeString == null) {
sortScope = SortScopeOptions.getSortScope(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT));
} else {
sortScope = SortScopeOptions.getSortScope(sortScopeString);
}
LOGGER.info("sort scope is set to " + sortScope);
} catch (Exception e) {
sortScope = SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
LOGGER.warn("Exception occured while resolving sort scope. " +
"sort scope is set to " + sortScope);
}
return sortScope;
}
/**
* Get the number of partitions in global sort
* @param globalSortPartitions
* @return the number of partitions
*/
public static int getGlobalSortPartitions(Object globalSortPartitions) {
int numPartitions;
try {
// First try to get the number from ddl, otherwise get it from carbon properties.
if (globalSortPartitions == null) {
numPartitions = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT));
} else {
numPartitions = Integer.parseInt(globalSortPartitions.toString());
}
} catch (Exception e) {
numPartitions = 0;
}
return numPartitions;
}
/**
* the method prepares and return the message mentioning the reason of badrecord
*
* @param columnName
* @param dataType
* @return
*/
public static String prepareFailureReason(String columnName, DataType dataType) {
return "The value with column name " + columnName + " and column data type " + dataType
.getName() + " is not a valid " + dataType + " type.";
}
/**
* This method will return an array whose element with be appended with the `append` strings
* @param inputArr inputArr
* @param append strings to append
* @return result
*/
public static String[] arrayAppend(String[] inputArr, String... append) {
String[] outArr = new String[inputArr.length];
StringBuffer sb = new StringBuffer();
for (String str : append) {
sb.append(str);
}
String appendStr = sb.toString();
for (int i = 0; i < inputArr.length; i++) {
outArr[i] = inputArr[i] + appendStr;
}
return outArr;
}
/**
* This method returns String if exception is TextParsingException
*
* @param input
* @return
*/
public static String trimErrorMessage(String input) {
String errorMessage = input;
if (input != null) {
if (input.split("Hint").length > 1) {
errorMessage = input.split("Hint")[0];
} else if (input.split("Parser Configuration:").length > 1) {
errorMessage = input.split("Parser Configuration:")[0];
}
}
return errorMessage;
}
/**
* The method returns true is either logger is enabled or action is redirect
* @param configuration
* @return
*/
public static boolean isRawDataRequired(CarbonDataLoadConfiguration configuration) {
boolean isRawDataRequired = Boolean.parseBoolean(
configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
.toString());
// if logger is disabled then check if action is redirect then raw data will be required.
if (!isRawDataRequired) {
Object bad_records_action =
configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION);
if (null != bad_records_action) {
LoggerAction loggerAction = null;
try {
loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
} catch (IllegalArgumentException e) {
loggerAction = LoggerAction.FORCE;
}
isRawDataRequired = loggerAction == LoggerAction.REDIRECT;
}
}
return isRawDataRequired;
}
/**
* Partition input iterators equally as per the number of threads.
*
* @return
*/
public static List<CarbonIterator<Object[]>>[] partitionInputReaderIterators(
CarbonIterator<Object[]>[] inputIterators, short sdkWriterCores) {
// Get the number of cores configured in property.
int numberOfCores;
if (sdkWriterCores > 0) {
numberOfCores = sdkWriterCores;
} else {
numberOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores();
}
// Get the minimum of number of cores and iterators size to get the number of parallel threads
// to be launched.
int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
if (parallelThreadNumber <= 0) {
parallelThreadNumber = 1;
}
List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
for (int i = 0; i < parallelThreadNumber; i++) {
iterators[i] = new ArrayList<>();
}
// Equally partition the iterators as per number of threads
for (int i = 0; i < inputIterators.length; i++) {
iterators[i % parallelThreadNumber].add(inputIterators[i]);
}
return iterators;
}
}