blob: 46c62b2f540a45fcef1d2aca77604ffce96dc253 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import org.apache.calcite.rel.type.RelDataType;
import org.apache.drill.exec.physical.impl.scan.v3.schema.SchemaUtils;
import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import io.netty.buffer.DrillBuf;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.holders.Decimal18Holder;
import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal9Holder;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.drill.exec.vector.NullableDateVector;
import org.apache.drill.exec.vector.NullableDecimal18Vector;
import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
import org.apache.drill.exec.vector.NullableDecimal9Vector;
import org.apache.drill.exec.vector.NullableFloat4Vector;
import org.apache.drill.exec.vector.NullableFloat8Vector;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.NullableTimeStampVector;
import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.NullableVarDecimalVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
public class HiveUtilities {
private static final Logger logger = LoggerFactory.getLogger(HiveUtilities.class);
* Partition value is received in string format. Convert it into appropriate object based on the type.
* @param typeInfo type info
* @param value partition values
* @param defaultPartitionValue default partition value
* @return converted object
public static Object convertPartitionType(TypeInfo typeInfo, String value, final String defaultPartitionValue) {
if (typeInfo.getCategory() != Category.PRIMITIVE) {
// In Hive only primitive types are allowed as partition column types.
throw new DrillRuntimeException("Non-Primitive types are not allowed as partition column type in Hive, " +
"but received one: " + typeInfo.getCategory());
if (defaultPartitionValue.equals(value)) {
return null;
final PrimitiveCategory pCat = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
try {
switch (pCat) {
case BINARY:
return value.getBytes();
return Boolean.parseBoolean(value);
case DECIMAL: {
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
return HiveDecimalUtils.enforcePrecisionScale(HiveDecimal.create(value), decimalTypeInfo);
case DOUBLE:
return Double.parseDouble(value);
case FLOAT:
return Float.parseFloat(value);
case BYTE:
case SHORT:
case INT:
return Integer.parseInt(value);
case LONG:
return Long.parseLong(value);
case STRING:
return value.getBytes();
case CHAR:
return value.trim().getBytes();
return Timestamp.valueOf(value);
case DATE:
return Date.valueOf(value);
} catch(final Exception e) {
// In Hive, partition values that can't be converted from string are considered to be NULL.
logger.trace("Failed to interpret '{}' value from partition value string '{}'", pCat, value);
return null;
return null;
* Populates vector with given value based on its type.
* @param vector vector instance
* @param managedBuffer Drill duffer
* @param val value
* @param start start position
* @param end end position
public static void populateVector(final ValueVector vector, final DrillBuf managedBuffer, final Object val,
final int start, final int end) {
TypeProtos.MinorType type = vector.getField().getType().getMinorType();
switch(type) {
NullableVarBinaryVector v = (NullableVarBinaryVector) vector;
byte[] value = (byte[]) val;
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, value, 0, value.length);
case BIT: {
NullableBitVector v = (NullableBitVector) vector;
Boolean value = (Boolean) val;
for (int i = start; i < end; i++) {
v.getMutator().set(i, value ? 1 : 0);
case FLOAT8: {
NullableFloat8Vector v = (NullableFloat8Vector) vector;
double value = (double) val;
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, value);
case FLOAT4: {
NullableFloat4Vector v = (NullableFloat4Vector) vector;
float value = (float) val;
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, value);
case INT: {
NullableIntVector v = (NullableIntVector) vector;
int value = (int) val;
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, value);
case BIGINT: {
NullableBigIntVector v = (NullableBigIntVector) vector;
long value = (long) val;
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, value);
case VARCHAR: {
NullableVarCharVector v = (NullableVarCharVector) vector;
byte[] value = (byte[]) val;
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, value, 0, value.length);
NullableTimeStampVector v = (NullableTimeStampVector) vector;
DateTime ts = new DateTime(((Timestamp) val).getTime()).withZoneRetainFields(DateTimeZone.UTC);
long value = ts.getMillis();
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, value);
case DATE: {
NullableDateVector v = (NullableDateVector) vector;
DateTime date = new DateTime(((Date)val).getTime()).withZoneRetainFields(DateTimeZone.UTC);
long value = date.getMillis();
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, value);
case DECIMAL9: {
final BigDecimal value = ((HiveDecimal)val).bigDecimalValue();
final NullableDecimal9Vector v = ((NullableDecimal9Vector) vector);
final Decimal9Holder holder = new Decimal9Holder();
holder.scale = v.getField().getScale();
holder.precision = v.getField().getPrecision();
holder.value = DecimalUtility.getDecimal9FromBigDecimal(value, holder.scale);
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, holder);
case DECIMAL18: {
final BigDecimal value = ((HiveDecimal)val).bigDecimalValue();
final NullableDecimal18Vector v = ((NullableDecimal18Vector) vector);
final Decimal18Holder holder = new Decimal18Holder();
holder.scale = v.getField().getScale();
holder.precision = v.getField().getPrecision();
holder.value = DecimalUtility.getDecimal18FromBigDecimal(value, holder.scale);
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, holder);
final int needSpace = Decimal28SparseHolder.nDecimalDigits * DecimalUtility.INTEGER_SIZE;
Preconditions.checkArgument(managedBuffer.capacity() > needSpace,
String.format("Not sufficient space in given managed buffer. Need %d bytes, buffer has %d bytes",
needSpace, managedBuffer.capacity()));
final BigDecimal value = ((HiveDecimal)val).bigDecimalValue();
final NullableDecimal28SparseVector v = ((NullableDecimal28SparseVector) vector);
final Decimal28SparseHolder holder = new Decimal28SparseHolder();
holder.scale = v.getField().getScale();
holder.precision = v.getField().getPrecision();
holder.buffer = managedBuffer;
holder.start = 0;
DecimalUtility.getSparseFromBigDecimal(value, holder.buffer, 0, holder.scale,
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, holder);
final int needSpace = Decimal38SparseHolder.nDecimalDigits * DecimalUtility.INTEGER_SIZE;
Preconditions.checkArgument(managedBuffer.capacity() > needSpace,
String.format("Not sufficient space in given managed buffer. Need %d bytes, buffer has %d bytes",
needSpace, managedBuffer.capacity()));
final BigDecimal value = ((HiveDecimal)val).bigDecimalValue();
final NullableDecimal38SparseVector v = ((NullableDecimal38SparseVector) vector);
final Decimal38SparseHolder holder = new Decimal38SparseHolder();
holder.scale = v.getField().getScale();
holder.precision = v.getField().getPrecision();
holder.buffer = managedBuffer;
holder.start = 0;
DecimalUtility.getSparseFromBigDecimal(value, holder.buffer, 0, holder.scale,
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, holder);
final BigDecimal value = ((HiveDecimal) val).bigDecimalValue()
.setScale(vector.getField().getScale(), RoundingMode.HALF_UP);
final NullableVarDecimalVector v = ((NullableVarDecimalVector) vector);
for (int i = start; i < end; i++) {
v.getMutator().setSafe(i, value);
* Obtains major type from given type info holder.
* @param typeInfo type info holder
* @param options session options
* @return appropriate major type, null otherwise. For some types may throw unsupported exception.
public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) {
switch (typeInfo.getCategory()) {
PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
MinorType minorType = HiveUtilities.getMinorTypeFromHivePrimitiveTypeInfo(primitiveTypeInfo, options);
MajorType.Builder typeBuilder = MajorType.newBuilder().setMinorType(minorType)
.setMode(DataMode.OPTIONAL); // Hive columns (both regular and partition) could have null values
switch (primitiveTypeInfo.getPrimitiveCategory()) {
case CHAR:
BaseCharTypeInfo baseCharTypeInfo = (BaseCharTypeInfo) primitiveTypeInfo;
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo;
// do nothing, other primitive categories do not have precision or scale
case LIST:
case MAP:
case STRUCT:
case UNION:
return null;
* Obtains minor type from given primitive type info holder.
* @param primitiveTypeInfo primitive type info holder
* @param options session options
* @return appropriate minor type, otherwise throws unsupported type exception
public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo, OptionSet options) {
switch(primitiveTypeInfo.getPrimitiveCategory()) {
case BINARY:
return TypeProtos.MinorType.VARBINARY;
return TypeProtos.MinorType.BIT;
case DECIMAL: {
if (!options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val) {
throw UserException.unsupportedError()
return MinorType.VARDECIMAL;
case DOUBLE:
return TypeProtos.MinorType.FLOAT8;
case FLOAT:
return TypeProtos.MinorType.FLOAT4;
// TODO (DRILL-2470)
// Byte and short (tinyint and smallint in SQL types) are currently read as integers
// as these smaller integer types are not fully supported in Drill today.
case SHORT:
case BYTE:
case INT:
return TypeProtos.MinorType.INT;
case LONG:
return TypeProtos.MinorType.BIGINT;
case STRING:
case CHAR:
return TypeProtos.MinorType.VARCHAR;
return TypeProtos.MinorType.TIMESTAMP;
case DATE:
return TypeProtos.MinorType.DATE;
return null;
* Utility method which gets table or partition {@link InputFormat} class. First it
* tries to get the class name from given StorageDescriptor object. If it doesn't contain it tries to get it from
* StorageHandler class set in table properties. If not found throws an exception.
* @param job {@link JobConf} instance needed incase the table is StorageHandler based table.
* @param sd {@link StorageDescriptor} instance of currently reading partition or table (for non-partitioned tables).
* @param table Table object
public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd, final Table table) throws Exception {
final String inputFormatName = sd.getInputFormat();
if (Strings.isNullOrEmpty(inputFormatName)) {
final String storageHandlerClass = table.getParameters().get(META_TABLE_STORAGE);
if (Strings.isNullOrEmpty(storageHandlerClass)) {
throw new ExecutionSetupException("Unable to get Hive table InputFormat class. There is neither " +
"InputFormat class explicitly specified nor StorageHandler class");
final HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(job, storageHandlerClass);
TableDesc tableDesc = new TableDesc();
tableDesc.setProperties(new org.apache.hadoop.hive.ql.metadata.Table(table).getMetadata());
storageHandler.configureInputJobProperties(tableDesc, table.getParameters());
return (Class<? extends InputFormat<?, ?>>) storageHandler.getInputFormatClass();
} else {
return (Class<? extends InputFormat<?, ?>>) Class.forName(inputFormatName);
* Utility method which adds give configs to {@link JobConf} object.
* @param job {@link JobConf} instance.
* @param properties New config properties
public static void addConfToJob(final JobConf job, final Properties properties) {
for (Object obj : properties.keySet()) {
job.set((String) obj, (String) properties.get(obj));
* Wrapper around {@code MetaStoreUtils#getPartitionMetadata(org.apache.hadoop.hive.metastore.api.Partition, Table)}
* which also adds parameters from table to properties returned by that method.
* @param partition the source of partition level parameters
* @param table the source of table level parameters
* @return properties
public static Properties getPartitionMetadata(final HivePartition partition, final HiveTableWithColumnCache table) {
restoreColumns(table, partition);
try {
Properties properties = new org.apache.hadoop.hive.ql.metadata.Partition(new org.apache.hadoop.hive.ql.metadata.Table(table), partition).getMetadataFromPartitionSchema();
// SerDe expects properties from Table, but above call doesn't add Table properties.
// Include Table properties in final list in order to not to break SerDes that depend on
// Table properties. For example AvroSerDe gets the schema from properties (passed as second argument)
.filter(e -> e.getKey() != null && e.getValue() != null)
.forEach(e -> properties.put(e.getKey(), e.getValue()));
return properties;
} catch (HiveException e) {
throw new DrillRuntimeException(e);
* Sets columns from table cache to table and partition.
* @param table the source of column lists cache
* @param partition partition which will set column list
public static void restoreColumns(HiveTableWithColumnCache table, HivePartition partition) {
// exactly the same column lists for partitions or table
// stored only one time to reduce physical plan serialization
if (partition != null && partition.getSd().getCols() == null) {
if (table.getSd().getCols() == null) {
* Wrapper around {@code MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}
* which also sets columns from table cache to table and returns properties returned by
* {@code MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}.
* @param table Hive table with cached columns
* @return Hive table metadata
public static Properties getTableMetadata(HiveTableWithColumnCache table) {
restoreColumns(table, null);
return new org.apache.hadoop.hive.ql.metadata.Table(table).getMetadata();
* Generates unsupported types exception message with list of supported types
* and throws user exception.
* @param unsupportedType unsupported type
public static void throwUnsupportedHiveDataTypeError(String unsupportedType) {
StringBuilder errMsg = new StringBuilder()
.append("Unsupported Hive data type ").append(unsupportedType).append(". ")
.append("Following Hive data types are supported in Drill for querying: ")
throw UserException.unsupportedError()
* Returns property value. If property is absent, return given default value.
* If property value is non-numeric will fail.
* @param tableProperties table properties
* @param propertyName property name
* @param defaultValue default value used in case if property is absent
* @return property value
* @throws NumberFormatException if property value is not numeric
public static int retrieveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
Object propertyObject = tableProperties.get(propertyName);
if (propertyObject == null) {
return defaultValue;
try {
return Integer.valueOf(propertyObject.toString());
} catch (NumberFormatException e) {
throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric",
propertyName, propertyObject.toString()));
* Checks if given table has header or footer.
* If at least one of them has value more then zero, method will return true.
* @param table table with column cache instance
* @return true if table contains header or footer, false otherwise
public static boolean hasHeaderOrFooter(HiveTableWithColumnCache table) {
Properties tableProperties = getTableMetadata(table);
int skipHeader = retrieveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, -1);
int skipFooter = retrieveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, -1);
return skipHeader > 0 || skipFooter > 0;
* This method checks whether the table is transactional and set necessary properties in {@link JobConf}.<br>
* If schema evolution properties aren't set in job conf for the input format, method sets the column names
* and types from table/partition properties or storage descriptor.
* @param job the job to update
* @param sd storage descriptor
public static void verifyAndAddTransactionalProperties(JobConf job, StorageDescriptor sd) {
if (AcidUtils.isTablePropertyTransactional(job)) {
HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
// No work is needed, if schema evolution is used
if (Utilities.isSchemaEvolutionEnabled(job, true) && job.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS) != null &&
job.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES) != null) {
String colNames;
String colTypes;
// Try to get get column names and types from table or partition properties. If they are absent there, get columns
// data from storage descriptor of the table
colNames = job.get(serdeConstants.LIST_COLUMNS);
colTypes = job.get(serdeConstants.LIST_COLUMN_TYPES);
if (colNames == null || colTypes == null) {
colNames = sd.getCols().stream()
colTypes = sd.getCols().stream()
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, colNames);
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, colTypes);
* Rule is matched when all of the following match:
* <ul>
* <li>GroupScan in given DrillScalRel is an {@link HiveScan}</li>
* <li> {@link HiveScan} is not already rewritten using Drill's native readers</li>
* <li> InputFormat in table metadata and all partitions metadata contains the same value {@param tableInputFormatClass}</li>
* <li> No error occurred while checking for the above conditions. An error is logged as warning.</li>
* @param call rule call
* @return True if the rule can be applied. False otherwise
public static boolean nativeReadersRuleMatches(RelOptRuleCall call, Class tableInputFormatClass) {
final DrillScanRel scanRel = call.rel(0);
if (!(scanRel.getGroupScan() instanceof HiveScan) || ((HiveScan) scanRel.getGroupScan()).isNativeReader()) {
return false;
final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
final HiveConf hiveConf = hiveScan.getHiveConf();
final HiveTableWithColumnCache hiveTable = hiveScan.getHiveReadEntry().getTable();
if (HiveUtilities.isParquetTableContainsUnsupportedType(hiveTable)) {
return false;
final Class<? extends InputFormat<?, ?>> tableInputFormat = getInputFormatFromSD(
HiveUtilities.getTableMetadata(hiveTable), hiveScan.getHiveReadEntry(), hiveTable.getSd(), hiveConf);
if (tableInputFormat == null || !tableInputFormat.equals(tableInputFormatClass)) {
return false;
final List<HiveTableWrapper.HivePartitionWrapper> partitions = hiveScan.getHiveReadEntry().getHivePartitionWrappers();
if (partitions == null) {
return true;
final List<FieldSchema> tableSchema = hiveTable.getSd().getCols();
// Make sure all partitions have the same input format as the table input format
for (HiveTableWrapper.HivePartitionWrapper partition : partitions) {
final StorageDescriptor partitionSD = partition.getPartition().getSd();
Class<? extends InputFormat<?, ?>> inputFormat = getInputFormatFromSD(HiveUtilities.getPartitionMetadata(
partition.getPartition(), hiveTable), hiveScan.getHiveReadEntry(), partitionSD, hiveConf);
if (inputFormat == null || !inputFormat.equals(tableInputFormat)) {
return false;
// Make sure the schema of the table and schema of the partition matches. If not return false. Schema changes
// between table and partition can happen when table schema is altered using ALTER statements after some
// partitions are already created. Currently native reader conversion doesn't handle schema changes between
// partition and table. Hive has extensive list of convert methods to convert from one type to rest of the
// possible types. Drill doesn't have the similar set of methods yet.
if (!partitionSD.getCols().equals(tableSchema)) {
logger.debug("Partitions schema is different from table schema. Currently native reader conversion can't " +
"handle schema difference between partitions and table");
return false;
return true;
* Get the input format from given {@link StorageDescriptor}.
* @param properties table properties
* @param hiveReadEntry hive read entry
* @param sd storage descriptor
* @return {@link InputFormat} class or null if a failure has occurred. Failure is logged as warning.
private static Class<? extends InputFormat<?, ?>> getInputFormatFromSD(final Properties properties,
final HiveReadEntry hiveReadEntry, final StorageDescriptor sd, final HiveConf hiveConf) {
final Table hiveTable = hiveReadEntry.getTable();
try {
final String inputFormatName = sd.getInputFormat();
if (!Strings.isNullOrEmpty(inputFormatName)) {
return (Class<? extends InputFormat<?, ?>>) Class.forName(inputFormatName);
final JobConf job = new JobConf(hiveConf);
HiveUtilities.addConfToJob(job, properties);
return HiveUtilities.getInputFormatClass(job, sd, hiveTable);
} catch (final Exception e) {
logger.warn("Failed to get InputFormat class from Hive table '{}.{}'. StorageDescriptor [{}]",
hiveTable.getDbName(), hiveTable.getTableName(), sd.toString(), e);
return null;
* Hive doesn't support union type for parquet tables yet.
* See <a href=""><a/>
* @param hiveTable Thrift table from Hive Metastore
* @return true if table contains unsupported data types, false otherwise
private static boolean isParquetTableContainsUnsupportedType(final Table hiveTable) {
for (FieldSchema hiveField : hiveTable.getSd().getCols()) {
final Category category = TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory();
if (category == Category.UNION) {
logger.debug("Hive table contains unsupported data type: {}", category);
return true;
return false;
* Creates HiveConf based on given list of configuration properties.
* @param properties config properties
* @return instance of HiveConf
public static HiveConf generateHiveConf(Map<String, String> properties) {
logger.trace("Override HiveConf with the following properties {}", properties);
HiveConf hiveConf = new HiveConf();
return hiveConf;
* Creates HiveConf based on properties in given HiveConf and configuration properties.
* @param hiveConf hive conf
* @param properties config properties
* @return instance of HiveConf
public static HiveConf generateHiveConf(HiveConf hiveConf, Map<String, String> properties) {
Properties changedProperties = hiveConf.getChangedProperties();
HiveConf newHiveConf = new HiveConf();
.forEach(name -> newHiveConf.set(name, changedProperties.getProperty(name)));
return newHiveConf;
* Helper method which stores partition columns in table columnListCache. If table columnListCache has exactly the
* same columns as partition, in partition stores columns index that corresponds to identical column list.
* If table columnListCache hasn't such column list, the column list adds to table columnListCache and in partition
* stores columns index that corresponds to column list.
* @param table hive table instance
* @param partition partition instance
* @return hive partition wrapper
public static HiveTableWrapper.HivePartitionWrapper createPartitionWithSpecColumns(HiveTableWithColumnCache table, Partition partition) {
int listIndex = table.getColumnListsCache().addOrGet(partition.getSd().getCols());
return new HiveTableWrapper.HivePartitionWrapper(new HivePartition(partition, listIndex));
* Converts specified {@code FieldSchema column} into {@link ColumnMetadata}.
* For the case when specified relDataType is struct, map with recursively converted children
* will be created.
* @param dataTypeConverter converter to obtain Calcite's types from Hive's ones
* @param column column to convert
* @return {@link ColumnMetadata} which corresponds to specified {@code FieldSchema column}
public static ColumnMetadata getColumnMetadata(HiveToRelDataTypeConverter dataTypeConverter, FieldSchema column) {
RelDataType relDataType = dataTypeConverter.convertToNullableRelDataType(column);
return SchemaUtils.getColumnMetadata(column.getName(), relDataType);