blob: 29b2e4d0887b90f7d0b4e1673e101ae7cfab4cf1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hive.util;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.carbondata.common.Strings;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.Field;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataLoadMetrics;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.log4j.Logger;
public class HiveCarbonUtil {
private static final Logger LOGGER =
LogServiceFactory.getLogService(HiveCarbonUtil.class.getName());
public static CarbonLoadModel getCarbonLoadModel(Configuration tableProperties) {
String[] tableUniqueName = tableProperties.get("name").split("\\.");
String databaseName = tableUniqueName[0];
String tableName = tableUniqueName[1];
String tablePath = tableProperties.get(hive_metastoreConstants.META_TABLE_LOCATION);
String columns = tableProperties.get(hive_metastoreConstants.META_TABLE_COLUMNS);
String sortColumns = tableProperties.get("sort_columns");
String columnTypes = tableProperties.get(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
String partitionColumns =
tableProperties.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
String partitionColumnTypes =
tableProperties.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
if (partitionColumns != null) {
columns = columns + "," + partitionColumns;
columnTypes = columnTypes + ":" + partitionColumnTypes;
}
String[] columnTypeArray = splitSchemaStringToArray(columnTypes);
String complexDelim = tableProperties.get("complex_delimiter", "");
CarbonLoadModel carbonLoadModel =
getCarbonLoadModel(tableName, databaseName, tablePath, sortColumns, columns.split(","),
columnTypeArray, tableProperties);
carbonLoadModel.setCarbonTransactionalTable(true);
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().setTransactionalTable(true);
for (String delim : complexDelim.split(",")) {
carbonLoadModel.setComplexDelimiter(delim);
}
return carbonLoadModel;
}
public static CarbonLoadModel getCarbonLoadModel(Properties tableProperties,
Configuration configuration) {
String[] tableUniqueName = tableProperties.getProperty("name").split("\\.");
String databaseName = tableUniqueName[0];
String tableName = tableUniqueName[1];
String tablePath = tableProperties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
String columns = tableProperties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
String sortColumns = tableProperties.getProperty("sort_columns");
String[] columnTypes = splitSchemaStringToArray(tableProperties.getProperty("columns.types"));
String complexDelim = tableProperties.getProperty("complex_delimiter", "");
CarbonLoadModel carbonLoadModel =
getCarbonLoadModel(tableName, databaseName, tablePath, sortColumns, columns.split(","),
columnTypes, configuration);
for (String delim : complexDelim.split(",")) {
carbonLoadModel.setComplexDelimiter(delim);
}
return carbonLoadModel;
}
public static CarbonLoadModel getCarbonLoadModel(String tableName, String databaseName,
String location, String sortColumnsString, String[] columns, String[] columnTypes,
Configuration configuration) {
CarbonLoadModel loadModel;
CarbonTable carbonTable;
try {
String schemaFilePath = CarbonTablePath.getSchemaFilePath(location);
AbsoluteTableIdentifier absoluteTableIdentifier =
AbsoluteTableIdentifier.from(location, databaseName, tableName, "");
if (FileFactory.getCarbonFile(schemaFilePath).exists()) {
carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
} else {
String carbonDataFile = CarbonUtil.getFilePathExternalFilePath(location, configuration);
if (carbonDataFile == null) {
carbonTable = CarbonTable.buildFromTableInfo(
getTableInfo(tableName, databaseName, location, sortColumnsString, columns,
columnTypes, new ArrayList<>()));
} else {
carbonTable = CarbonTable.buildFromTableInfo(
SchemaReader.inferSchema(absoluteTableIdentifier, false, configuration));
}
carbonTable.setTransactionalTable(false);
}
} catch (SQLException | IOException e) {
throw new RuntimeException("Unable to fetch schema for the table: " + tableName, e);
}
CarbonLoadModelBuilder carbonLoadModelBuilder = new CarbonLoadModelBuilder(carbonTable);
Map<String, String> options = new HashMap<>();
options.put("fileheader", Strings.mkString(columns, ","));
try {
loadModel = carbonLoadModelBuilder.build(options, System.currentTimeMillis(), "");
} catch (InvalidLoadOptionException | IOException e) {
throw new RuntimeException(e);
}
loadModel.setSkipParsers();
loadModel.setMetrics(new DataLoadMetrics());
return loadModel;
}
private static TableInfo getTableInfo(String tableName, String databaseName, String location,
String sortColumnsString, String[] columns, String[] columnTypes,
List<String> partitionColumns) throws SQLException {
TableInfo tableInfo = new TableInfo();
TableSchemaBuilder builder = new TableSchemaBuilder();
builder.tableName(tableName);
List<String> sortColumns = new ArrayList<>();
if (sortColumnsString != null) {
sortColumns = Arrays.asList(sortColumnsString.toLowerCase().split("\\,"));
}
PartitionInfo partitionInfo = null;
AtomicInteger integer = new AtomicInteger();
List<StructField> partitionStructFields = new ArrayList<>();
for (int i = 0; i < columns.length; i++) {
DataType dataType = DataTypeUtil.convertHiveTypeToCarbon(columnTypes[i]);
Field field = new Field(columns[i].toLowerCase(), dataType);
if (partitionColumns.contains(columns[i])) {
partitionStructFields
.add(new StructField(columns[i].toLowerCase(), dataType, field.getChildren()));
} else {
builder.addColumn(new StructField(columns[i].toLowerCase(), dataType, field.getChildren()),
integer, sortColumns.contains(columns[i]), false);
}
}
if (!partitionStructFields.isEmpty()) {
List<ColumnSchema> partitionColumnSchemas = new ArrayList<>();
for (StructField partitionStructField : partitionStructFields) {
partitionColumnSchemas.add(builder.addColumn(partitionStructField, integer,
sortColumns.contains(partitionStructField.getFieldName()), false));
}
partitionInfo = new PartitionInfo(partitionColumnSchemas, PartitionType.NATIVE_HIVE);
}
TableSchema tableSchema = builder.build();
SchemaEvolution schemaEvol = new SchemaEvolution();
List<SchemaEvolutionEntry> schemaEvolutionEntry = new ArrayList<>();
schemaEvolutionEntry.add(new SchemaEvolutionEntry());
schemaEvol.setSchemaEvolutionEntryList(schemaEvolutionEntry);
tableSchema.setSchemaEvolution(schemaEvol);
tableSchema.setPartitionInfo(partitionInfo);
tableInfo.setDatabaseName(databaseName);
tableInfo.setTablePath(location);
tableInfo.setFactTable(tableSchema);
return tableInfo;
}
private static void writeSchemaFile(TableInfo tableInfo) throws IOException {
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
String schemaFilePath = CarbonTablePath.getSchemaFilePath(tableInfo.getTablePath());
String metadataPath = CarbonTablePath.getMetadataPath(tableInfo.getTablePath());
FileFactory.mkdirs(metadataPath);
ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
thriftWriter.open(FileWriteOperation.OVERWRITE);
thriftWriter.write(schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(),
tableInfo.getFactTable().getTableName()));
thriftWriter.close();
FileFactory.getCarbonFile(schemaFilePath).setLastModifiedTime(System.currentTimeMillis());
}
public static HiveMetaHook getMetaHook() {
return new HiveMetaHook() {
@Override
public void preCreateTable(Table table) throws MetaException {
}
@Override
public void rollbackCreateTable(Table table) throws MetaException {
commitDropTable(table, false);
}
@Override
public void commitCreateTable(Table table) throws MetaException {
try {
List<FieldSchema> fieldSchemas = table.getSd().getCols();
String[] columns = new String[fieldSchemas.size() + table.getPartitionKeys().size()];
String[] columnTypes = new String[fieldSchemas.size() + table.getPartitionKeys().size()];
int i = 0;
for (FieldSchema fieldSchema : table.getSd().getCols()) {
columns[i] = fieldSchema.getName();
columnTypes[i++] = fieldSchema.getType();
}
List<String> partitionColumns = new ArrayList<>();
for (FieldSchema partitionCol : table.getPartitionKeys()) {
columns[i] = partitionCol.getName().toLowerCase();
columnTypes[i++] = partitionCol.getType();
partitionColumns.add(partitionCol.getName().toLowerCase());
}
TableInfo tableInfo =
getTableInfo(table.getTableName(), table.getDbName(), table.getSd().getLocation(),
table.getParameters().getOrDefault("sort_columns", ""), columns, columnTypes,
partitionColumns);
tableInfo.getFactTable().getTableProperties().putAll(table.getParameters());
writeSchemaFile(tableInfo);
} catch (IOException | SQLException e) {
LOGGER.error(e);
throw new MetaException("Problem while writing schema file: " + e.getMessage());
}
}
@Override
public void preDropTable(Table table) throws MetaException {
}
@Override
public void rollbackDropTable(Table table) throws MetaException {
}
@Override
public void commitDropTable(Table table, boolean b) throws MetaException {
FileFactory.deleteAllFilesOfDir(new File(table.getSd().getLocation()));
}
};
}
public static String[] splitSchemaStringToArray(String schema) {
List<String> tokens = new ArrayList();
StringBuilder stack = new StringBuilder();
int openingCount = 0;
for (int i = 0; i < schema.length(); i++) {
if (schema.charAt(i) == '<') {
openingCount++;
stack.append(schema.charAt(i));
} else if (schema.charAt(i) == '>') {
--openingCount;
if (i == schema.length() - 1) {
stack.append(schema.charAt(i));
tokens.add(stack.toString());
stack = new StringBuilder();
openingCount = 0;
} else {
stack.append(schema.charAt(i));
}
} else if (schema.charAt(i) == ':' && openingCount > 0) {
stack.append(schema.charAt(i));
} else if (schema.charAt(i) == ':' && openingCount == 0) {
tokens.add(stack.toString());
stack = new StringBuilder();
openingCount = 0;
} else if (i == schema.length() - 1) {
stack.append(schema.charAt(i));
tokens.add(stack.toString());
stack = new StringBuilder();
openingCount = 0;
} else {
stack.append(schema.charAt(i));
}
}
return tokens.toArray(new String[tokens.size()]);
}
}