| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.sdk.file; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.carbondata.common.annotations.InterfaceAudience; |
| import org.apache.carbondata.common.annotations.InterfaceStability; |
| import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.datastore.impl.FileFactory; |
| import org.apache.carbondata.core.metadata.datatype.DataType; |
| import org.apache.carbondata.core.metadata.datatype.DataTypes; |
| import org.apache.carbondata.core.metadata.datatype.MapType; |
| import org.apache.carbondata.core.metadata.datatype.StructField; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.metadata.schema.table.TableSchema; |
| import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; |
| import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.processing.loading.model.CarbonLoadModel; |
| import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder; |
| |
| import org.apache.hadoop.conf.Configuration; |
| |
| /** |
| * Builder for {@link CarbonWriter} |
| */ |
| @InterfaceAudience.User |
| @InterfaceStability.Unstable |
| public class CarbonWriterBuilder { |
| private Schema schema; |
| private String path; |
| private String[] sortColumns; |
| private int blockletSize; |
| private int blockSize; |
| private long timestamp; |
| private Map<String, String> options; |
| private String taskNo; |
| private int localDictionaryThreshold; |
| private boolean isLocalDictionaryEnabled; |
| private short numOfThreads; |
| private Configuration hadoopConf; |
| private enum WRITER_TYPE { |
| CSV, AVRO, JSON |
| } |
| |
| private WRITER_TYPE writerType; |
| |
| /** |
| * Sets the output path of the writer builder |
| * @param path is the absolute path where output files are written |
| * This method must be called when building CarbonWriterBuilder |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder outputPath(String path) { |
| Objects.requireNonNull(path, "path should not be null"); |
| this.path = path; |
| return this; |
| } |
| |
| /** |
| * sets the list of columns that needs to be in sorted order |
| * @param sortColumns is a string array of columns that needs to be sorted. |
| * If it is null or by default all dimensions are selected for sorting |
| * If it is empty array, no columns are sorted |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder sortBy(String[] sortColumns) { |
| if (sortColumns != null) { |
| for (int i = 0; i < sortColumns.length; i++) { |
| sortColumns[i] = sortColumns[i].toLowerCase(); |
| } |
| } |
| this.sortColumns = sortColumns; |
| return this; |
| } |
| |
| /** |
| * sets the taskNo for the writer. SDKs concurrently running |
| * will set taskNo in order to avoid conflicts in file's name during write. |
| * @param taskNo is the TaskNo user wants to specify. |
| * by default it is system time in nano seconds. |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder taskNo(long taskNo) { |
| this.taskNo = String.valueOf(taskNo); |
| return this; |
| } |
| |
| /** |
| * to set the timestamp in the carbondata and carbonindex index files |
| * @param timestamp is a timestamp to be used in the carbondata and carbonindex index files. |
| * By default set to zero. |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder uniqueIdentifier(long timestamp) { |
| Objects.requireNonNull(timestamp, "Unique Identifier should not be null"); |
| this.timestamp = timestamp; |
| return this; |
| } |
| |
| /** |
| * To support the load options for sdk writer |
| * @param options key,value pair of load options. |
| * supported keys values are |
| * a. bad_records_logger_enable -- true (write into separate logs), false |
| * b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT |
| * c. bad_record_path -- path |
| * d. dateformat -- same as JAVA SimpleDateFormat |
| * e. timestampformat -- same as JAVA SimpleDateFormat |
| * f. complex_delimiter_level_1 -- value to Split the complexTypeData |
| * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData |
| * h. quotechar |
| * i. escapechar |
| * |
| * Default values are as follows. |
| * |
| * a. bad_records_logger_enable -- "false" |
| * b. bad_records_action -- "FAIL" |
| * c. bad_record_path -- "" |
| * d. dateformat -- "" , uses from carbon.properties file |
| * e. timestampformat -- "", uses from carbon.properties file |
| * f. complex_delimiter_level_1 -- "$" |
| * g. complex_delimiter_level_2 -- ":" |
| * h. quotechar -- "\"" |
| * i. escapechar -- "\\" |
| * |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder withLoadOptions(Map<String, String> options) { |
| Objects.requireNonNull(options, "Load options should not be null"); |
| //validate the options. |
| for (String option: options.keySet()) { |
| if (!option.equalsIgnoreCase("bad_records_logger_enable") && |
| !option.equalsIgnoreCase("bad_records_action") && |
| !option.equalsIgnoreCase("bad_record_path") && |
| !option.equalsIgnoreCase("dateformat") && |
| !option.equalsIgnoreCase("timestampformat") && |
| !option.equalsIgnoreCase("complex_delimiter_level_1") && |
| !option.equalsIgnoreCase("complex_delimiter_level_2") && |
| !option.equalsIgnoreCase("quotechar") && |
| !option.equalsIgnoreCase("escapechar")) { |
| throw new IllegalArgumentException("Unsupported option:" + option |
| + ". Refer method header or documentation"); |
| } |
| } |
| |
| if (this.options == null) { |
| // convert it to treeMap as keys need to be case insensitive |
| this.options = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); |
| } |
| this.options.putAll(options); |
| return this; |
| } |
| |
| /** |
| * To support the table properties for sdk writer |
| * |
| * @param options key,value pair of create table properties. |
| * supported keys values are |
| * a. table_blocksize -- [1-2048] values in MB. Default value is 1024 |
| * b. table_blocklet_size -- values in MB. Default value is 64 MB |
| * c. local_dictionary_threshold -- positive value, default is 10000 |
| * d. local_dictionary_enable -- true / false. Default is false |
| * e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted. |
| * If empty string "" is passed. No columns are sorted |
| * j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort" |
| * k. long_string_columns -- comma separated string columns which are more than 32k length. |
| * default value is null. |
| * |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder withTableProperties(Map<String, String> options) { |
| Objects.requireNonNull(options, "Table properties should not be null"); |
| Set<String> supportedOptions = new HashSet<>(Arrays |
| .asList("table_blocksize", "table_blocklet_size", "local_dictionary_threshold", |
| "local_dictionary_enable", "sort_columns", "sort_scope", "long_string_columns")); |
| |
| for (String key : options.keySet()) { |
| if (!supportedOptions.contains(key.toLowerCase())) { |
| throw new IllegalArgumentException( |
| "Unsupported options. " + "Refer method header or documentation"); |
| } |
| } |
| |
| for (Map.Entry<String, String> entry : options.entrySet()) { |
| if (entry.getKey().equalsIgnoreCase("table_blocksize")) { |
| this.withBlockSize(Integer.parseInt(entry.getValue())); |
| } else if (entry.getKey().equalsIgnoreCase("table_blocklet_size")) { |
| this.withBlockletSize(Integer.parseInt(entry.getValue())); |
| } else if (entry.getKey().equalsIgnoreCase("local_dictionary_threshold")) { |
| this.localDictionaryThreshold(Integer.parseInt(entry.getValue())); |
| } else if (entry.getKey().equalsIgnoreCase("local_dictionary_enable")) { |
| this.enableLocalDictionary((entry.getValue().equalsIgnoreCase("true"))); |
| } else if (entry.getKey().equalsIgnoreCase("sort_columns")) { |
| //sort columns |
| String[] sortColumns; |
| if (entry.getValue().trim().isEmpty()) { |
| sortColumns = new String[0]; |
| } else { |
| sortColumns = entry.getValue().split(","); |
| } |
| this.sortBy(sortColumns); |
| } else if (entry.getKey().equalsIgnoreCase("sort_scope")) { |
| this.withSortScope(entry); |
| } else if (entry.getKey().equalsIgnoreCase("long_string_columns")) { |
| updateToLoadOptions(entry); |
| } |
| } |
| return this; |
| } |
| |
| /** |
| * To make sdk writer thread safe. |
| * |
| * @param numOfThreads should number of threads in which writer is called in multi-thread scenario |
| * default sdk writer is not thread safe. |
| * can use one writer instance in one thread only. |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder withThreadSafe(short numOfThreads) { |
| if (numOfThreads < 1) { |
| throw new IllegalArgumentException("number of threads cannot be lesser than 1. " |
| + "suggest to keep two times the number of cores available"); |
| } |
| this.numOfThreads = numOfThreads; |
| return this; |
| } |
| |
| /** |
| * To support hadoop configuration |
| * |
| * @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder withHadoopConf(Configuration conf) { |
| if (conf != null) { |
| this.hadoopConf = conf; |
| } |
| return this; |
| } |
| |
| /** |
| * To set the carbondata file size in MB between 1MB-2048MB |
| * @param blockSize is size in MB between 1MB to 2048 MB |
| * default value is 1024 MB |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder withBlockSize(int blockSize) { |
| if (blockSize <= 0 || blockSize > 2048) { |
| throw new IllegalArgumentException("blockSize should be between 1 MB to 2048 MB"); |
| } |
| this.blockSize = blockSize; |
| return this; |
| } |
| |
| /** |
| * @param localDictionaryThreshold is localDictionaryThreshold,default is 10000 |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder localDictionaryThreshold(int localDictionaryThreshold) { |
| if (localDictionaryThreshold <= 0) { |
| throw new IllegalArgumentException( |
| "Local Dictionary Threshold should be greater than 0"); |
| } |
| this.localDictionaryThreshold = localDictionaryThreshold; |
| return this; |
| } |
| |
| /** |
| * @param enableLocalDictionary enable local dictionary , default is false |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder enableLocalDictionary(boolean enableLocalDictionary) { |
| this.isLocalDictionaryEnabled = enableLocalDictionary; |
| return this; |
| } |
| |
| |
| /** |
| * To set the blocklet size of CarbonData file |
| * @param blockletSize is blocklet size in MB |
| * default value is 64 MB |
| * @return updated CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder withBlockletSize(int blockletSize) { |
| if (blockletSize <= 0) { |
| throw new IllegalArgumentException("blockletSize should be greater than zero"); |
| } |
| this.blockletSize = blockletSize; |
| return this; |
| } |
| |
| /** |
| * to build a {@link CarbonWriter}, which accepts row in CSV format |
| * |
| * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema} |
| * @return CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder withCsvInput(Schema schema) { |
| Objects.requireNonNull(schema, "schema should not be null"); |
| this.schema = schema; |
| this.writerType = WRITER_TYPE.CSV; |
| return this; |
| } |
| |
| /** |
| * to build a {@link CarbonWriter}, which accepts Avro object |
| * |
| * @param avroSchema avro Schema object {org.apache.avro.Schema} |
| * @return CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema) { |
| Objects.requireNonNull(avroSchema, "Avro schema should not be null"); |
| this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema); |
| this.writerType = WRITER_TYPE.AVRO; |
| return this; |
| } |
| |
| /** |
| * to build a {@link CarbonWriter}, which accepts Json object |
| * |
| * @param carbonSchema carbon Schema object |
| * @return CarbonWriterBuilder |
| */ |
| public CarbonWriterBuilder withJsonInput(Schema carbonSchema) { |
| Objects.requireNonNull(carbonSchema, "schema should not be null"); |
| this.schema = carbonSchema; |
| this.writerType = WRITER_TYPE.JSON; |
| return this; |
| } |
| |
| /** |
| * Build a {@link CarbonWriter} |
| * This writer is not thread safe, |
| * use withThreadSafe() configuration in multi thread environment |
| * |
| * @return CarbonWriter {AvroCarbonWriter/CSVCarbonWriter/JsonCarbonWriter based on Input Type } |
| * @throws IOException |
| * @throws InvalidLoadOptionException |
| */ |
| public CarbonWriter build() throws IOException, InvalidLoadOptionException { |
| Objects.requireNonNull(path, "path should not be null"); |
| if (this.writerType == null) { |
| throw new IOException( |
| "Writer type is not set, use withCsvInput() or withAvroInput() or withJsonInput() " |
| + "API based on input"); |
| } |
| CarbonLoadModel loadModel = buildLoadModel(schema); |
| loadModel.setSdkWriterCores(numOfThreads); |
| if (hadoopConf == null) { |
| hadoopConf = FileFactory.getConfiguration(); |
| } |
| if (this.writerType == WRITER_TYPE.AVRO) { |
| // AVRO records are pushed to Carbon as Object not as Strings. This was done in order to |
| // handle multi level complex type support. As there are no conversion converter step is |
| // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder |
| // which will skip Conversion Step. |
| loadModel.setLoadWithoutConverterStep(true); |
| return new AvroCarbonWriter(loadModel, hadoopConf); |
| } else if (this.writerType == WRITER_TYPE.JSON) { |
| loadModel.setJsonFileLoad(true); |
| return new JsonCarbonWriter(loadModel, hadoopConf); |
| } else { |
| // CSV |
| return new CSVCarbonWriter(loadModel, hadoopConf); |
| } |
| } |
| |
| private void setCsvHeader(CarbonLoadModel model) { |
| Field[] fields = schema.getFields(); |
| StringBuilder builder = new StringBuilder(); |
| String[] columns = new String[fields.length]; |
| int i = 0; |
| for (Field field : fields) { |
| if (null != field) { |
| builder.append(field.getFieldName()); |
| builder.append(","); |
| columns[i++] = field.getFieldName(); |
| } |
| } |
| String header = builder.toString(); |
| model.setCsvHeader(header.substring(0, header.length() - 1)); |
| model.setCsvHeaderColumns(columns); |
| } |
| |
| public CarbonLoadModel buildLoadModel(Schema carbonSchema) |
| throws IOException, InvalidLoadOptionException { |
| timestamp = System.nanoTime(); |
| // validate long_string_column |
| Set<String> longStringColumns = new HashSet<>(); |
| if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) { |
| String[] specifiedLongStrings = |
| options.get(CarbonCommonConstants.LONG_STRING_COLUMNS).toLowerCase().split(","); |
| for (String str : specifiedLongStrings) { |
| longStringColumns.add(str.trim()); |
| } |
| validateLongStringColumns(carbonSchema, longStringColumns); |
| } |
| // for the longstring field, change the datatype from string to varchar |
| this.schema = updateSchemaFields(carbonSchema, longStringColumns); |
| // build CarbonTable using schema |
| CarbonTable table = buildCarbonTable(); |
| // build LoadModel |
| return buildLoadModel(table, timestamp, taskNo, options); |
| } |
| |
| private void validateLongStringColumns(Schema carbonSchema, Set<String> longStringColumns) { |
| // long string columns must be string or varchar type |
| for (Field field :carbonSchema.getFields()) { |
| if (longStringColumns.contains(field.getFieldName().toLowerCase()) && ( |
| (field.getDataType() != DataTypes.STRING) && field.getDataType() != DataTypes.VARCHAR)) { |
| throw new RuntimeException( |
| "long string column : " + field.getFieldName() + "is not supported for data type: " |
| + field.getDataType()); |
| } |
| } |
| // long string columns must not be present in sort columns |
| if (sortColumns != null) { |
| for (String col : sortColumns) { |
| // already will be in lower case |
| if (longStringColumns.contains(col)) { |
| throw new RuntimeException( |
| "long string column : " + col + "must not be present in sort columns"); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Build a {@link CarbonTable} |
| */ |
| private CarbonTable buildCarbonTable() { |
| TableSchemaBuilder tableSchemaBuilder = TableSchema.builder(); |
| if (blockSize > 0) { |
| tableSchemaBuilder = tableSchemaBuilder.blockSize(blockSize); |
| } |
| |
| if (blockletSize > 0) { |
| tableSchemaBuilder = tableSchemaBuilder.blockletSize(blockletSize); |
| } |
| tableSchemaBuilder.enableLocalDictionary(isLocalDictionaryEnabled); |
| tableSchemaBuilder.localDictionaryThreshold(localDictionaryThreshold); |
| List<String> sortColumnsList = new ArrayList<>(); |
| if (sortColumns == null) { |
| // If sort columns are not specified, default set all dimensions to sort column. |
| // When dimensions are default set to sort column, |
| // Inverted index will be supported by default for sort columns. |
| //Null check for field to handle hole in field[] ex. |
| // user passed size 4 but supplied only 2 fileds |
| for (Field field : schema.getFields()) { |
| if (null != field) { |
| if (field.getDataType() == DataTypes.STRING || |
| field.getDataType() == DataTypes.DATE || |
| field.getDataType() == DataTypes.TIMESTAMP) { |
| sortColumnsList.add(field.getFieldName()); |
| } |
| } |
| } |
| sortColumns = new String[sortColumnsList.size()]; |
| sortColumns = sortColumnsList.toArray(sortColumns); |
| } else { |
| sortColumnsList = Arrays.asList(sortColumns); |
| } |
| ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()]; |
| Field[] fields = schema.getFields(); |
| buildTableSchema(fields, tableSchemaBuilder, sortColumnsList, sortColumnsSchemaList); |
| |
| tableSchemaBuilder.setSortColumns(Arrays.asList(sortColumnsSchemaList)); |
| String tableName; |
| String dbName; |
| dbName = ""; |
| tableName = "_tempTable_" + String.valueOf(timestamp); |
| TableSchema schema = tableSchemaBuilder.build(); |
| schema.setTableName(tableName); |
| CarbonTable table = |
| CarbonTable.builder().tableName(schema.getTableName()).databaseName(dbName).tablePath(path) |
| .tableSchema(schema).isTransactionalTable(false).build(); |
| return table; |
| } |
| |
| private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder, |
| List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList) { |
| Set<String> uniqueFields = new HashSet<>(); |
| // a counter which will be used in case of complex array type. This valIndex will be assigned |
| // to child of complex array type in the order val1, val2 so that each array type child is |
| // differentiated to any level |
| AtomicInteger valIndex = new AtomicInteger(0); |
| // Check if any of the columns specified in sort columns are missing from schema. |
| for (String sortColumn: sortColumnsList) { |
| boolean exists = false; |
| for (Field field : fields) { |
| if (field.getFieldName().equalsIgnoreCase(sortColumn)) { |
| exists = true; |
| break; |
| } |
| } |
| if (!exists) { |
| throw new RuntimeException( |
| "column: " + sortColumn + " specified in sort columns does not exist in schema"); |
| } |
| } |
| int i = 0; |
| for (Field field : fields) { |
| if (null != field) { |
| if (!uniqueFields.add(field.getFieldName())) { |
| throw new RuntimeException( |
| "Duplicate column " + field.getFieldName() + " found in table schema"); |
| } |
| int isSortColumn = sortColumnsList.indexOf(field.getFieldName()); |
| if (isSortColumn > -1) { |
| // unsupported types for ("array", "struct", "double", "float", "decimal") |
| if (field.getDataType() == DataTypes.DOUBLE || field.getDataType() == DataTypes.FLOAT |
| || DataTypes.isDecimal(field.getDataType()) || field.getDataType().isComplexType() |
| || field.getDataType() == DataTypes.VARCHAR) { |
| String errorMsg = |
| "sort columns not supported for array, struct, map, double, float, decimal," |
| + "varchar"; |
| throw new RuntimeException(errorMsg); |
| } |
| } |
| if (field.getChildren() != null && field.getChildren().size() > 0) { |
| if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) { |
| // Loop through the inner columns and for a StructData |
| DataType complexType = |
| DataTypes.createArrayType(field.getChildren().get(0).getDataType()); |
| tableSchemaBuilder |
| .addColumn(new StructField(field.getFieldName(), complexType), valIndex, false); |
| } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) { |
| // Loop through the inner columns and for a StructData |
| List<StructField> structFieldsArray = |
| new ArrayList<StructField>(field.getChildren().size()); |
| for (StructField childFld : field.getChildren()) { |
| structFieldsArray |
| .add(new StructField(childFld.getFieldName(), childFld.getDataType())); |
| } |
| DataType complexType = DataTypes.createStructType(structFieldsArray); |
| tableSchemaBuilder |
| .addColumn(new StructField(field.getFieldName(), complexType), valIndex, false); |
| } else if (field.getDataType().getName().equalsIgnoreCase("MAP")) { |
| // Loop through the inner columns for MapType |
| DataType mapType = DataTypes.createMapType(((MapType) field.getDataType()).getKeyType(), |
| field.getChildren().get(0).getDataType()); |
| tableSchemaBuilder |
| .addColumn(new StructField(field.getFieldName(), mapType), valIndex, false); |
| } |
| } else { |
| ColumnSchema columnSchema = tableSchemaBuilder |
| .addColumn(new StructField(field.getFieldName(), field.getDataType()), |
| valIndex, isSortColumn > -1); |
| if (isSortColumn > -1) { |
| columnSchema.setSortColumn(true); |
| sortColumnsSchemaList[isSortColumn] = columnSchema; |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Build a {@link CarbonLoadModel} |
| */ |
| private CarbonLoadModel buildLoadModel(CarbonTable table, long timestamp, String taskNo, |
| Map<String, String> options) throws InvalidLoadOptionException, IOException { |
| if (options == null) { |
| options = new HashMap<>(); |
| } |
| CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table); |
| CarbonLoadModel build = builder.build(options, timestamp, taskNo); |
| setCsvHeader(build); |
| return build; |
| } |
| |
| /* loop through all the parent column and |
| a) change fields name to lower case. |
| this is to match with sort column case. |
| b) change string fields to varchar type */ |
| private Schema updateSchemaFields(Schema schema, Set<String> longStringColumns) { |
| if (schema == null) { |
| return null; |
| } |
| Field[] fields = schema.getFields(); |
| for (int i = 0; i < fields.length; i++) { |
| if (fields[i] != null) { |
| fields[i].updateNameToLowerCase(); |
| if (longStringColumns != null) { |
| /* Also update the string type to varchar */ |
| if (longStringColumns.contains(fields[i].getFieldName())) { |
| fields[i].updateDataTypeToVarchar(); |
| } |
| } |
| } |
| } |
| return new Schema(fields); |
| } |
| |
| private void updateToLoadOptions(Map.Entry<String, String> entry) { |
| if (this.options == null) { |
| // convert it to treeMap as keys need to be case insensitive |
| this.options = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); |
| } |
| // update it to load options |
| this.options.put(entry.getKey(), entry.getValue()); |
| } |
| |
| private void withSortScope(Map.Entry<String, String> entry) { |
| String sortScope = entry.getValue(); |
| if (sortScope != null) { |
| if ((!CarbonUtil.isValidSortOption(sortScope))) { |
| throw new IllegalArgumentException("Invalid Sort Scope Option: " + sortScope); |
| } else if (sortScope.equalsIgnoreCase("global_sort")) { |
| throw new IllegalArgumentException("global sort is not supported"); |
| } |
| } |
| // update it to load options |
| updateToLoadOptions(entry); |
| } |
| } |