blob: b1985de5bb8a8810c6c5d2eec46a39d85882009a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.sdk.file;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.common.constants.LoggerAction;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.MapType;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
import org.apache.carbondata.processing.util.CarbonLoaderUtil;
import org.apache.hadoop.conf.Configuration;
/**
* Builder for {@link CarbonWriter}
*/
@InterfaceAudience.User
@InterfaceStability.Unstable
public class CarbonWriterBuilder {
private Schema schema;
private String path;
//initialize with empty array , as no columns should be selected for sorting in NO_SORT
private String[] sortColumns = new String[0];
private int blockletSize;
private int pageSizeInMb;
private int blockSize;
private long timestamp;
// use TreeMap as keys need to be case insensitive
private Map<String, String> options = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private String taskNo;
private int localDictionaryThreshold;
private boolean isLocalDictionaryEnabled = Boolean.parseBoolean(
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT);
private short numOfThreads;
private Configuration hadoopConf;
private String writtenByApp;
private String[] invertedIndexColumns;
private enum WRITER_TYPE {
CSV, AVRO, JSON
}
private WRITER_TYPE writerType;
// can be set by withSchemaFile
private CarbonTable carbonTable;
/**
* Sets the output path of the writer builder
*
* @param path is the absolute path where output files are written
* This method must be called when building CarbonWriterBuilder
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder outputPath(String path) {
Objects.requireNonNull(path, "path should not be null");
this.path = path;
return this;
}
/**
* sets the list of columns that needs to be in sorted order
*
* @param sortColumns is a string array of columns that needs to be sorted.
* If it is null or by default all dimensions are selected for sorting
* If it is empty array, no columns are sorted
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder sortBy(String[] sortColumns) {
if (sortColumns != null) {
for (int i = 0; i < sortColumns.length; i++) {
sortColumns[i] = sortColumns[i].toLowerCase().trim();
}
}
this.sortColumns = sortColumns;
return this;
}
/**
* sets the list of columns for which inverted index needs to generated
*
* @param invertedIndexColumns is a string array of columns for which inverted index needs to
* generated.
* If it is null or an empty array, inverted index will be generated for none of the columns
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder invertedIndexFor(String[] invertedIndexColumns) {
if (invertedIndexColumns != null) {
for (int i = 0; i < invertedIndexColumns.length; i++) {
invertedIndexColumns[i] = invertedIndexColumns[i].toLowerCase().trim();
}
}
this.invertedIndexColumns = invertedIndexColumns;
return this;
}
/**
* sets the taskNo for the writer. SDKs concurrently running
* will set taskNo in order to avoid conflicts in file's name during write.
*
* @param taskNo is the TaskNo user wants to specify.
* by default it is system time in nano seconds.
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder taskNo(long taskNo) {
this.taskNo = String.valueOf(taskNo);
return this;
}
/**
* to set the timestamp in the carbondata and carbonindex index files
*
* @param timestamp is a timestamp to be used in the carbondata and carbonindex index files.
* By default set to zero.
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder uniqueIdentifier(long timestamp) {
Objects.requireNonNull(timestamp, "Unique Identifier should not be null");
this.timestamp = timestamp;
return this;
}
/**
* To support the load options for sdk writer
*
* @param options key,value pair of load options.
* supported keys values are
* a. bad_records_logger_enable -- true (write into separate logs), false
* b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
* c. bad_record_path -- path
* d. dateformat -- same as JAVA SimpleDateFormat
* e. timestampformat -- same as JAVA SimpleDateFormat
* f. complex_delimiter_level_1 -- value to Split the complexTypeData
* g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
* h. quotechar
* i. escapechar
* j. fileheader
* <p>
* Default values are as follows.
* <p>
* a. bad_records_logger_enable -- "false"
* b. bad_records_action -- "FAIL"
* c. bad_record_path -- ""
* d. dateformat -- "" , uses from carbon.properties file
* e. timestampformat -- "", uses from carbon.properties file
* f. complex_delimiter_level_1 -- "\001"
* g. complex_delimiter_level_2 -- "\002"
* h. quotechar -- "\""
* i. escapechar -- "\\"
* j. fileheader -- None
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
Objects.requireNonNull(options, "Load options should not be null");
//validate the options.
for (String option : options.keySet()) {
if (!option.equalsIgnoreCase("bad_records_logger_enable") &&
!option.equalsIgnoreCase("bad_records_action") &&
!option.equalsIgnoreCase("bad_record_path") &&
!option.equalsIgnoreCase("dateformat") &&
!option.equalsIgnoreCase("timestampformat") &&
!option.equalsIgnoreCase("complex_delimiter_level_1") &&
!option.equalsIgnoreCase("complex_delimiter_level_2") &&
!option.equalsIgnoreCase("complex_delimiter_level_3") &&
!option.equalsIgnoreCase("quotechar") &&
!option.equalsIgnoreCase("escapechar") &&
!option.equalsIgnoreCase("binary_decoder") &&
!option.equalsIgnoreCase("fileheader")) {
throw new IllegalArgumentException("Unsupported option:" + option
+ ". Refer method header or documentation");
}
}
for (Map.Entry<String, String> entry : options.entrySet()) {
if (entry.getKey().equalsIgnoreCase("bad_records_action")) {
try {
LoggerAction.valueOf(entry.getValue().toUpperCase());
} catch (Exception e) {
throw new IllegalArgumentException(
"option BAD_RECORDS_ACTION can have only either " +
"FORCE or IGNORE or REDIRECT or FAIL. It shouldn't be " + entry.getValue());
}
} else if (entry.getKey().equalsIgnoreCase("bad_records_logger_enable")) {
boolean isValid;
isValid = CarbonUtil.validateBoolean(entry.getValue());
if (!isValid) {
throw new IllegalArgumentException("Invalid value "
+ entry.getValue() + " for key " + entry.getKey());
}
} else if (entry.getKey().equalsIgnoreCase("quotechar")) {
String quoteChar = entry.getValue();
if (quoteChar.length() > 1) {
throw new IllegalArgumentException("QUOTECHAR cannot be more than one character.");
}
} else if (entry.getKey().equalsIgnoreCase("escapechar")) {
String escapeChar = entry.getValue();
if (escapeChar.length() > 1 && !CarbonLoaderUtil.isValidEscapeSequence(escapeChar)) {
throw new IllegalArgumentException("ESCAPECHAR cannot be more than one character.");
}
} else if (entry.getKey().toLowerCase().equalsIgnoreCase("binary_decoder")) {
String binaryDecoderChar = entry.getValue();
if (binaryDecoderChar.length() > 1 &&
!CarbonLoaderUtil.isValidBinaryDecoder(binaryDecoderChar)) {
throw new IllegalArgumentException("Binary decoder only support Base64, " +
"Hex or no decode for string, don't support " + binaryDecoderChar);
}
}
}
this.options.putAll(options);
return this;
}
/**
* To support the load options for sdk writer
*
* @param key the key of load option
* @param value the value of load option
* @return updated CarbonWriterBuilder object
*/
public CarbonWriterBuilder withLoadOption(String key, String value) {
Objects.requireNonNull(key, "key of load properties should not be null");
Objects.requireNonNull(key, "value of load properties should not be null");
Map map = new HashMap();
map.put(key, value);
withLoadOptions(map);
return this;
}
/**
* To support the carbon table for sdk writer
*
* @param table carbon table
* @return CarbonWriterBuilder object
*/
public CarbonWriterBuilder withTable(CarbonTable table) {
Objects.requireNonNull(table, "Table should not be null");
this.carbonTable = table;
return this;
}
/**
* To support the table properties for sdk writer
*
* @param options key,value pair of create table properties.
* supported keys values are
* a. table_blocksize -- [1-2048] values in MB. Default value is 1024
* b. table_blocklet_size -- values in MB. Default value is 64 MB
* c. local_dictionary_threshold -- positive value, default is 10000
* d. local_dictionary_enable -- true / false. Default is false
* e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted.
* If empty string "" is passed. No columns are sorted
* j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort"
* k. long_string_columns -- comma separated string columns which are more than 32k length.
* default value is null.
* l. inverted_index -- comma separated string columns for which inverted index needs to be
* generated
* m. table_page_size_inmb -- [1-1755] MB.
*
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withTableProperties(Map<String, String> options) {
Objects.requireNonNull(options, "Table properties should not be null");
Set<String> supportedOptions = new HashSet<>(Arrays
.asList("table_blocksize", "table_blocklet_size", "local_dictionary_threshold",
"local_dictionary_enable", "sort_columns", "sort_scope", "long_string_columns",
"inverted_index", "table_page_size_inmb"));
for (String key : options.keySet()) {
if (!supportedOptions.contains(key.toLowerCase())) {
throw new IllegalArgumentException(
"Unsupported options. " + "Refer method header or documentation");
}
}
for (Map.Entry<String, String> entry : options.entrySet()) {
if (entry.getKey().equalsIgnoreCase("table_blocksize")) {
this.withBlockSize(Integer.parseInt(entry.getValue()));
} else if (entry.getKey().equalsIgnoreCase("table_blocklet_size")) {
this.withBlockletSize(Integer.parseInt(entry.getValue()));
} else if (entry.getKey().equalsIgnoreCase("local_dictionary_threshold")) {
this.localDictionaryThreshold(Integer.parseInt(entry.getValue()));
} else if (entry.getKey().equalsIgnoreCase("local_dictionary_enable")) {
this.enableLocalDictionary((entry.getValue().equalsIgnoreCase("true")));
} else if (entry.getKey().equalsIgnoreCase("sort_columns")) {
//sort columns
String[] sortColumns;
if (entry.getValue().trim().isEmpty()) {
sortColumns = new String[0];
} else {
sortColumns = entry.getValue().split(",");
}
this.sortBy(sortColumns);
} else if (entry.getKey().equalsIgnoreCase("sort_scope")) {
this.withSortScope(entry);
} else if (entry.getKey().equalsIgnoreCase("long_string_columns")) {
updateToLoadOptions(entry);
} else if (entry.getKey().equalsIgnoreCase("inverted_index")) {
//inverted index columns
String[] invertedIndexColumns;
if (entry.getValue().trim().isEmpty()) {
invertedIndexColumns = new String[0];
} else {
invertedIndexColumns = entry.getValue().split(",");
}
this.invertedIndexFor(invertedIndexColumns);
} else if (entry.getKey().equalsIgnoreCase("table_page_size_inmb")) {
this.withPageSizeInMb(Integer.parseInt(entry.getValue()));
}
}
return this;
}
/**
* To support the table properties for sdk writer
*
* @param key property key
* @param value property value
* @return CarbonWriterBuilder object
*/
public CarbonWriterBuilder withTableProperty(String key, String value) {
Objects.requireNonNull(key, "key of table properties should not be null");
Objects.requireNonNull(key, "value of table properties should not be null");
Map map = new HashMap();
map.put(key, value);
withTableProperties(map);
return this;
}
/**
* To make sdk writer thread safe.
*
* @param numOfThreads should number of threads in which writer is called in multi-thread scenario
* default sdk writer is not thread safe.
* can use one writer instance in one thread only.
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withThreadSafe(short numOfThreads) {
if (numOfThreads < 1) {
throw new IllegalArgumentException("number of threads cannot be lesser than 1. "
+ "suggest to keep two times the number of cores available");
}
this.numOfThreads = numOfThreads;
return this;
}
/**
* To support hadoop configuration
*
* @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withHadoopConf(Configuration conf) {
if (conf != null) {
this.hadoopConf = conf;
}
return this;
}
/**
* Updates the hadoop configuration with the given key value
*
* @param key key word
* @param value value
* @return this object
*/
public CarbonWriterBuilder withHadoopConf(String key, String value) {
if (this.hadoopConf == null) {
this.hadoopConf = new Configuration(true);
}
this.hadoopConf.set(key, value);
return this;
}
/**
* To set the carbondata file size in MB between 1MB-2048MB
*
* @param blockSize is size in MB between 1MB to 2048 MB
* default value is 1024 MB
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withBlockSize(int blockSize) {
if (blockSize <= 0 || blockSize > 2048) {
throw new IllegalArgumentException("blockSize should be between 1 MB to 2048 MB");
}
this.blockSize = blockSize;
return this;
}
/**
* @param localDictionaryThreshold is localDictionaryThreshold, default is 10000
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder localDictionaryThreshold(int localDictionaryThreshold) {
if (localDictionaryThreshold <= 0) {
throw new IllegalArgumentException(
"Local Dictionary Threshold should be greater than 0");
}
this.localDictionaryThreshold = localDictionaryThreshold;
return this;
}
/**
* @param appName appName which is writing the carbondata files
* @return
*/
public CarbonWriterBuilder writtenBy(String appName) {
this.writtenByApp = appName;
return this;
}
/**
* @param enableLocalDictionary enable local dictionary, default is false
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder enableLocalDictionary(boolean enableLocalDictionary) {
this.isLocalDictionaryEnabled = enableLocalDictionary;
return this;
}
/**
* To set the blocklet size of CarbonData file
*
* @param blockletSize is blocklet size in MB
* default value is 64 MB
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withBlockletSize(int blockletSize) {
if (blockletSize <= 0) {
throw new IllegalArgumentException("blockletSize should be greater than zero");
}
this.blockletSize = blockletSize;
return this;
}
/**
* To set the blocklet size of CarbonData file
*
* @param pageSizeInMb is page size in MB
*
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withPageSizeInMb(int pageSizeInMb) {
if (pageSizeInMb < 1 || pageSizeInMb > 1755) {
throw new IllegalArgumentException("pageSizeInMb must be 1 MB - 1755 MB");
}
this.pageSizeInMb = pageSizeInMb;
return this;
}
/**
* to build a {@link CarbonWriter}, which accepts row in CSV format
*
* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
* @return CarbonWriterBuilder
*/
public CarbonWriterBuilder withCsvInput(Schema schema) {
Objects.requireNonNull(schema, "schema should not be null");
if (this.schema != null) {
throw new IllegalArgumentException("schema should be set only once");
}
this.schema = schema;
this.writerType = WRITER_TYPE.CSV;
return this;
}
/**
* to build a {@link CarbonWriter}, which accepts row in CSV format
*
* @return CarbonWriterBuilder
*/
public CarbonWriterBuilder withCsvInput() {
this.writerType = WRITER_TYPE.CSV;
return this;
}
/**
* to build a {@link CarbonWriter}, which accepts row in CSV format
*
* @param jsonSchema json Schema string
* @return CarbonWriterBuilder
*/
public CarbonWriterBuilder withCsvInput(String jsonSchema) {
Objects.requireNonNull(jsonSchema, "schema should not be null");
if (this.schema != null) {
throw new IllegalArgumentException("schema should be set only once");
}
this.schema = Schema.parseJson(jsonSchema);
this.writerType = WRITER_TYPE.CSV;
return this;
}
/**
* to build a {@link CarbonWriter}, which accepts Avro object
*
* @param avroSchema avro Schema object {org.apache.avro.Schema}
* @return CarbonWriterBuilder
*/
public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema) {
Objects.requireNonNull(avroSchema, "Avro schema should not be null");
if (this.schema != null) {
throw new IllegalArgumentException("schema should be set only once");
}
this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
this.writerType = WRITER_TYPE.AVRO;
return this;
}
/**
* to build a {@link CarbonWriter}, which accepts Json object
*
* @param carbonSchema carbon Schema object
* @return CarbonWriterBuilder
*/
public CarbonWriterBuilder withJsonInput(Schema carbonSchema) {
Objects.requireNonNull(carbonSchema, "schema should not be null");
if (this.schema != null) {
throw new IllegalArgumentException("schema should be set only once");
}
this.schema = carbonSchema;
this.writerType = WRITER_TYPE.JSON;
return this;
}
/**
* to build a {@link CarbonWriter}, which accepts row in Json format
*
* @return CarbonWriterBuilder
*/
public CarbonWriterBuilder withJsonInput() {
this.writerType = WRITER_TYPE.JSON;
return this;
}
public CarbonWriterBuilder withSchemaFile(String schemaFilePath) throws IOException {
Objects.requireNonNull(schemaFilePath, "schema file path should not be null");
if (path == null) {
throw new IllegalArgumentException("output path should be set before setting schema file");
}
carbonTable = SchemaReader.readCarbonTableFromSchema(schemaFilePath, new Configuration());
carbonTable.getTableInfo().setTablePath(path);
carbonTable.setTransactionalTable(false);
List<ColumnSchema> columnSchemas =
carbonTable.getCreateOrderColumn().stream().map(
CarbonColumn::getColumnSchema
).collect(Collectors.toList());
schema = new Schema(columnSchemas);
return this;
}
/**
* Build a {@link CarbonWriter}
* This writer is not thread safe,
* use withThreadSafe() configuration in multi thread environment
*
* @return CarbonWriter {AvroCarbonWriter/CSVCarbonWriter/JsonCarbonWriter based on Input Type }
* @throws IOException
* @throws InvalidLoadOptionException
*/
public CarbonWriter build() throws IOException, InvalidLoadOptionException {
Objects.requireNonNull(path, "path should not be null");
if (this.writerType == null) {
throw new RuntimeException(
"'writerType' must be set, use withCsvInput() or withAvroInput() or withJsonInput() "
+ "API based on input");
}
if (this.writtenByApp == null || this.writtenByApp.isEmpty()) {
throw new RuntimeException(
"'writtenBy' must be set when writing carbon files, use writtenBy() API to "
+ "set it, it can be the name of the application which is using the SDK");
}
if (this.schema == null) {
throw new RuntimeException("schema should be set");
}
CarbonLoadModel loadModel = buildLoadModel(schema);
loadModel.setSdkWriterCores(numOfThreads);
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, writtenByApp);
if (hadoopConf == null) {
hadoopConf = FileFactory.getConfiguration();
}
if (this.writerType == WRITER_TYPE.AVRO) {
// AVRO records are pushed to Carbon as Object not as Strings. This was done in order to
// handle multi level complex type support. As there are no conversion converter step is
// removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder
// which will skip Conversion Step.
loadModel.setLoadWithoutConverterStep(true);
return new AvroCarbonWriter(loadModel, hadoopConf);
} else if (this.writerType == WRITER_TYPE.JSON) {
loadModel.setJsonFileLoad(true);
return new JsonCarbonWriter(loadModel, hadoopConf);
} else {
// CSV
return new CSVCarbonWriter(loadModel, hadoopConf);
}
}
private void setCsvHeader(CarbonLoadModel model) {
Field[] fields = schema.getFields();
StringBuilder builder = new StringBuilder();
String[] columns = new String[fields.length];
int i = 0;
for (Field field : fields) {
if (null != field) {
builder.append(field.getFieldName());
builder.append(",");
columns[i++] = field.getFieldName();
}
}
String header = builder.toString();
model.setCsvHeader(header.substring(0, header.length() - 1));
model.setCsvHeaderColumns(columns);
}
public CarbonLoadModel buildLoadModel(Schema carbonSchema)
throws IOException, InvalidLoadOptionException {
timestamp = System.currentTimeMillis();
// validate long_string_column
Set<String> longStringColumns = new HashSet<>();
if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) {
String[] specifiedLongStrings =
options.get(CarbonCommonConstants.LONG_STRING_COLUMNS).toLowerCase().split(",");
for (String str : specifiedLongStrings) {
longStringColumns.add(str.trim());
}
validateLongStringColumns(carbonSchema, longStringColumns);
}
// for the longstring field, change the datatype from string to varchar
this.schema = updateSchemaFields(carbonSchema, longStringColumns);
if (sortColumns != null && sortColumns.length != 0) {
if (options == null || options.get("sort_scope") == null) {
// If sort_columns are specified and sort_scope is not specified,
// change sort scope to local_sort as now by default sort scope is no_sort.
if (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)
== null) {
if (options == null) {
options = new HashMap<>();
}
options.put("sort_scope", "local_sort");
}
}
}
if (carbonTable == null) {
// if carbonTable is not set by user, build it using schema
carbonTable = buildCarbonTable();
}
// build LoadModel
return buildLoadModel(carbonTable, timestamp, taskNo, options);
}
private void validateLongStringColumns(Schema carbonSchema, Set<String> longStringColumns) {
// long string columns must be string or varchar type
for (Field field : carbonSchema.getFields()) {
if (longStringColumns.contains(field.getFieldName().toLowerCase()) && (
(field.getDataType() != DataTypes.STRING) && field.getDataType() != DataTypes.VARCHAR)) {
throw new RuntimeException(
"long string column : " + field.getFieldName() + " is not supported for data type: "
+ field.getDataType());
}
}
// long string columns must not be present in sort columns
if (sortColumns != null) {
for (String col : sortColumns) {
// already will be in lower case
if (longStringColumns.contains(col)) {
throw new RuntimeException(
"long string column : " + col + "must not be present in sort columns");
}
}
}
}
/**
* Build a {@link CarbonTable}
*/
private CarbonTable buildCarbonTable() {
TableSchemaBuilder tableSchemaBuilder = TableSchema.builder();
if (blockSize > 0) {
tableSchemaBuilder = tableSchemaBuilder.blockSize(blockSize);
}
if (blockletSize > 0) {
tableSchemaBuilder = tableSchemaBuilder.blockletSize(blockletSize);
}
if (pageSizeInMb > 0) {
tableSchemaBuilder = tableSchemaBuilder.pageSizeInMb(pageSizeInMb);
}
tableSchemaBuilder.enableLocalDictionary(isLocalDictionaryEnabled);
tableSchemaBuilder.localDictionaryThreshold(localDictionaryThreshold);
List<String> sortColumnsList = new ArrayList<>();
if (sortColumns == null) {
// If sort columns are not specified, default set all dimensions to sort column.
// When dimensions are default set to sort column,
// Inverted index will be supported by default for sort columns.
//Null check for field to handle hole in field[] ex.
// user passed size 4 but supplied only 2 fileds
for (Field field : schema.getFields()) {
if (null != field) {
if (field.getDataType() == DataTypes.STRING ||
field.getDataType() == DataTypes.DATE ||
field.getDataType() == DataTypes.TIMESTAMP) {
sortColumnsList.add(field.getFieldName());
}
}
}
sortColumns = new String[sortColumnsList.size()];
sortColumns = sortColumnsList.toArray(sortColumns);
} else {
sortColumnsList = Arrays.asList(sortColumns);
}
ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
List<String> invertedIdxColumnsList = new ArrayList<>();
if (null != invertedIndexColumns) {
invertedIdxColumnsList = Arrays.asList(invertedIndexColumns);
}
Field[] fields = schema.getFields();
buildTableSchema(fields, tableSchemaBuilder, sortColumnsList, sortColumnsSchemaList,
invertedIdxColumnsList);
tableSchemaBuilder.setSortColumns(Arrays.asList(sortColumnsSchemaList));
String tableName;
String dbName;
dbName = "";
tableName = "_tempTable_" + String.valueOf(timestamp);
TableSchema schema = tableSchemaBuilder.build();
schema.setTableName(tableName);
CarbonTable table =
CarbonTable.builder().tableName(schema.getTableName()).databaseName(dbName).tablePath(path)
.tableSchema(schema).isTransactionalTable(false).build();
return table;
}
private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder,
List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList,
List<String> invertedIdxColumnsList) {
Set<String> uniqueFields = new HashSet<>();
// a counter which will be used in case of complex array type. This valIndex will be assigned
// to child of complex array type in the order val1, val2 so that each array type child is
// differentiated to any level
AtomicInteger valIndex = new AtomicInteger(0);
// Check if any of the columns specified in sort columns are missing from schema.
for (String sortColumn : sortColumnsList) {
boolean exists = false;
for (Field field : fields) {
if (field.getFieldName().equalsIgnoreCase(sortColumn)) {
exists = true;
break;
}
}
if (!exists) {
throw new RuntimeException(
"column: " + sortColumn + " specified in sort columns does not exist in schema");
}
}
// Check if any of the columns specified in inverted index are missing from schema.
for (String invertedIdxColumn : invertedIdxColumnsList) {
boolean exists = false;
for (Field field : fields) {
if (field.getFieldName().equalsIgnoreCase(invertedIdxColumn)) {
exists = true;
break;
}
}
if (!exists) {
throw new RuntimeException("column: " + invertedIdxColumn
+ " specified in inverted index columns does not exist in schema");
}
}
int i = 0;
for (Field field : fields) {
if (null != field) {
if (!uniqueFields.add(field.getFieldName())) {
throw new RuntimeException(
"Duplicate column " + field.getFieldName() + " found in table schema");
}
int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
int isInvertedIdxColumn = invertedIdxColumnsList.indexOf(field.getFieldName());
if (isSortColumn > -1) {
// unsupported types for ("array", "struct", "double", "float", "decimal")
if (field.getDataType() == DataTypes.DOUBLE || field.getDataType() == DataTypes.FLOAT
|| DataTypes.isDecimal(field.getDataType()) || field.getDataType().isComplexType()
|| field.getDataType() == DataTypes.VARCHAR
|| field.getDataType() == DataTypes.BINARY) {
String errorMsg =
"sort columns not supported for array, struct, map, double, float, decimal, "
+ "varchar, binary";
throw new RuntimeException(errorMsg);
}
}
if (field.getChildren() != null && field.getChildren().size() > 0) {
if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
// Loop through the inner columns and for a StructData
DataType complexType =
DataTypes.createArrayType(field.getChildren().get(0).getDataType());
tableSchemaBuilder
.addColumn(new StructField(field.getFieldName(), complexType), valIndex, false,
isInvertedIdxColumn > -1);
} else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) {
// Loop through the inner columns and for a StructData
List<StructField> structFieldsArray =
new ArrayList<StructField>(field.getChildren().size());
for (StructField childFld : field.getChildren()) {
structFieldsArray
.add(new StructField(childFld.getFieldName(), childFld.getDataType()));
}
DataType complexType = DataTypes.createStructType(structFieldsArray);
tableSchemaBuilder
.addColumn(new StructField(field.getFieldName(), complexType), valIndex, false,
isInvertedIdxColumn > -1);
} else if (field.getDataType().getName().equalsIgnoreCase("MAP")) {
// Loop through the inner columns for MapType
DataType mapType = DataTypes.createMapType(((MapType) field.getDataType()).getKeyType(),
field.getChildren().get(0).getDataType());
tableSchemaBuilder
.addColumn(new StructField(field.getFieldName(), mapType), valIndex, false,
isInvertedIdxColumn > -1);
}
} else {
ColumnSchema columnSchema = tableSchemaBuilder
.addColumn(new StructField(field.getFieldName(), field.getDataType()), valIndex,
isSortColumn > -1, isInvertedIdxColumn > -1);
if (isSortColumn > -1) {
columnSchema.setSortColumn(true);
sortColumnsSchemaList[isSortColumn] = columnSchema;
}
}
}
}
}
/**
* Build a {@link CarbonLoadModel}
*/
private CarbonLoadModel buildLoadModel(CarbonTable table, long timestamp, String taskNo,
Map<String, String> options) throws InvalidLoadOptionException, IOException {
if (options == null) {
options = new HashMap<>();
}
CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
CarbonLoadModel build = builder.build(options, timestamp, taskNo);
setCsvHeader(build);
return build;
}
/* loop through all the parent column and
a) change fields name to lower case.
this is to match with sort column case.
b) change string fields to varchar type */
private Schema updateSchemaFields(Schema schema, Set<String> longStringColumns) {
if (schema == null) {
return null;
}
Field[] fields = schema.getFields();
for (int i = 0; i < fields.length; i++) {
if (fields[i] != null) {
if (longStringColumns != null) {
/* Also update the string type to varchar */
if (longStringColumns.contains(fields[i].getFieldName())) {
fields[i].updateDataTypeToVarchar();
}
}
}
}
return new Schema(fields);
}
private void updateToLoadOptions(Map.Entry<String, String> entry) {
if (this.options == null) {
// convert it to treeMap as keys need to be case insensitive
this.options = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
}
// update it to load options
this.options.put(entry.getKey(), entry.getValue());
}
private void withSortScope(Map.Entry<String, String> entry) {
String sortScope = entry.getValue();
if (sortScope != null) {
if ((!CarbonUtil.isValidSortOption(sortScope))) {
throw new IllegalArgumentException("Invalid Sort Scope Option: " + sortScope);
} else if (sortScope.equalsIgnoreCase("global_sort")) {
throw new IllegalArgumentException("global sort is not supported");
}
}
// update it to load options
updateToLoadOptions(entry);
}
}