blob: 8898fa15860de194454c0d53407176dfc8818bf6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.NullLiteral;
import org.apache.impala.analysis.PartitionKeyValue;
import org.apache.impala.analysis.ToSqlUtils;
import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.local.CatalogdMetaProvider;
import org.apache.impala.catalog.local.LocalCatalog;
import org.apache.impala.catalog.local.MetaProvider;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TColumnDescriptor;
import org.apache.impala.thrift.TGetCatalogMetricsResult;
import org.apache.impala.thrift.THdfsPartition;
import org.apache.impala.thrift.TTableStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
/**
* Static utility functions shared between FeCatalog implementations.
*/
public abstract class FeCatalogUtils {
private final static Logger LOG = LoggerFactory.getLogger(FeCatalogUtils.class);
/**
* Gets the ColumnType from the given FieldSchema by using Impala's SqlParser.
*
* The type can either be:
* - Supported by Impala, in which case the type is returned.
* - A type Impala understands but is not yet implemented (e.g. date), the type is
* returned but type.IsSupported() returns false.
* - A supported type that exceeds an Impala limit, e.g., on the nesting depth.
* - A type Impala can't understand at all, and a TableLoadingException is thrown.
*
* Throws a TableLoadingException if the FieldSchema could not be parsed. In this
* case, 'tableName' is included in the error message.
*/
public static Type parseColumnType(FieldSchema fs, String tableName)
throws TableLoadingException {
Type type = Type.parseColumnType(fs.getType());
if (type == null) {
throw new TableLoadingException(String.format(
"Unsupported type '%s' in column '%s' of table '%s'",
fs.getType(), fs.getName(), tableName));
}
if (type.exceedsMaxNestingDepth()) {
throw new TableLoadingException(String.format(
"Type exceeds the maximum nesting depth of %s:\n%s",
Type.MAX_NESTING_DEPTH, type.toSql()));
}
return type;
}
/**
* Convert a list of HMS FieldSchemas to internal Column types.
* @throws TableLoadingException if any type is invalid
*/
public static ImmutableList<Column> fieldSchemasToColumns(
Iterable<FieldSchema> fieldSchemas,
String tableName) throws TableLoadingException {
int pos = 0;
ImmutableList.Builder<Column> ret = ImmutableList.builder();
for (FieldSchema s : fieldSchemas) {
Type type = parseColumnType(s, tableName);
ret.add(new Column(s.getName(), type, s.getComment(), pos));
++pos;
}
return ret.build();
}
/**
* Validate that the clustering columns are valid for a table
*
* TODO(todd): consider refactoring to combine with
* HdfsTable.addColumnsFromFieldSchema
*
* @throws TableLoadingException if the columns are invalid
*/
public static void validateClusteringColumns(
Iterable<Column> cols, String tableName)
throws TableLoadingException {
// Check if we support partitioning on columns of such a type.
for (Column c : cols) {
Type type = c.getType();
if (!type.supportsTablePartitioning()) {
throw new TableLoadingException(
String.format("Failed to load metadata for table '%s' because of " +
"unsupported partition-column type '%s' in partition column '%s'",
tableName, type.toString(), c.getName()));
}
}
}
// TODO(todd): move to a default method in FeTable in Java8
public static List<TColumnDescriptor> getTColumnDescriptors(FeTable table) {
List<TColumnDescriptor> colDescs = new ArrayList<>();
for (Column col: table.getColumns()) {
colDescs.add(new TColumnDescriptor(col.getName(), col.getType().toThrift()));
}
return colDescs;
}
/**
* Given the list of column stats returned from the metastore, inject those
* stats into matching columns in 'table'.
*/
public static void injectColumnStats(List<ColumnStatisticsObj> colStats,
FeTable table) {
for (ColumnStatisticsObj stats: colStats) {
Column col = table.getColumn(stats.getColName());
Preconditions.checkNotNull(col, "Unable to find column %s in table %s",
stats.getColName(), table.getFullName());
if (!ColumnStats.isSupportedColType(col.getType())) {
LOG.warn(String.format(
"Statistics for %s, column %s are not supported as column " +
"has type %s", table.getFullName(), col.getName(), col.getType()));
continue;
}
if (!col.updateStats(stats.getStatsData())) {
LOG.warn(String.format(
"Failed to load column stats for %s, column %s. Stats may be " +
"incompatible with column type %s. Consider regenerating statistics " +
"for %s.", table.getFullName(), col.getName(), col.getType(),
table.getFullName()));
continue;
}
}
}
/**
* Returns the value of the ROW_COUNT constant, or -1 if not found.
*/
public static long getRowCount(Map<String, String> parameters) {
return getLongParam(StatsSetupConst.ROW_COUNT, parameters);
}
public static long getTotalSize(Map<String, String> parameters) {
return getLongParam(StatsSetupConst.TOTAL_SIZE, parameters);
}
private static long getLongParam(String key, Map<String, String> parameters) {
if (parameters == null) return -1;
String value = parameters.get(key);
if (value == null) return -1;
try {
return Long.valueOf(value);
} catch (NumberFormatException exc) {
// ignore
}
return -1;
}
/**
* Convenience method to load exactly one partition from a table.
*
* TODO(todd): upon moving to Java8 this could be a default method
* in FeFsTable.
*/
public static FeFsPartition loadPartition(FeFsTable table,
long partitionId) {
Collection<? extends FeFsPartition> partCol = table.loadPartitions(
Collections.singleton(partitionId));
if (partCol.size() != 1) {
throw new AssertionError(String.format(
"expected exactly one result fetching partition ID %s from table %s " +
"(got %s)", partitionId, table.getFullName(), partCol.size()));
}
return Iterables.getOnlyElement(partCol);
}
/**
* Load all partitions from the given table.
*/
public static Collection<? extends FeFsPartition> loadAllPartitions(
FeFsTable table) {
return table.loadPartitions(table.getPartitionIds());
}
/**
* Parse the partition key values out of their stringified format used by HMS.
*/
public static List<LiteralExpr> parsePartitionKeyValues(FeFsTable table,
List<String> hmsPartitionValues) throws CatalogException {
Preconditions.checkArgument(
hmsPartitionValues.size() == table.getNumClusteringCols(),
"Cannot parse partition values '%s' for table %s: " +
"expected %d values but got %d",
hmsPartitionValues, table.getFullName(),
table.getNumClusteringCols(), hmsPartitionValues.size());
List<LiteralExpr> keyValues = new ArrayList<>();
for (String partitionKey : hmsPartitionValues) {
Type type = table.getColumns().get(keyValues.size()).getType();
// Deal with Hive's special NULL partition key.
if (partitionKey.equals(table.getNullPartitionKeyValue())) {
keyValues.add(NullLiteral.create(type));
} else {
try {
keyValues.add(LiteralExpr.create(partitionKey, type));
} catch (Exception ex) {
LOG.warn(String.format(
"Failed to create literal expression: type: %s, value: '%s'",
type.toSql(), partitionKey), ex);
throw new CatalogException("Invalid partition key value of type: " + type,
ex);
}
}
}
for (Expr v: keyValues) v.analyzeNoThrow(null);
return keyValues;
}
/**
* Return a partition name formed by concatenating partition keys and their values,
* compatible with the way Hive names partitions. Reuses Hive's
* org.apache.hadoop.hive.common.FileUtils.makePartName() function to build the name
* string because there are a number of special cases for how partition names are URL
* escaped.
*
* TODO: this could be a default method in FeFsPartition in Java 8.
*/
public static String getPartitionName(FeFsPartition partition) {
FeFsTable table = partition.getTable();
List<String> partitionCols = new ArrayList<>();
for (int i = 0; i < table.getNumClusteringCols(); ++i) {
partitionCols.add(table.getColumns().get(i).getName());
}
return FileUtils.makePartName(
partitionCols, getPartitionValuesAsStrings(partition, true));
}
// TODO: this could be a default method in FeFsPartition in Java 8.
public static List<String> getPartitionValuesAsStrings(
FeFsPartition partition, boolean mapNullsToHiveKey) {
List<String> ret = new ArrayList<>();
for (LiteralExpr partValue: partition.getPartitionValues()) {
if (mapNullsToHiveKey) {
ret.add(PartitionKeyValue.getPartitionKeyValueString(
partValue, partition.getTable().getNullPartitionKeyValue()));
} else {
ret.add(partValue.getStringValue());
}
}
return ret;
}
// TODO: this could be a default method in FeFsPartition in Java 8.
public static String getConjunctSqlForPartition(FeFsPartition part) {
List<String> partColSql = new ArrayList<>();
for (Column partCol: part.getTable().getClusteringColumns()) {
partColSql.add(ToSqlUtils.getIdentSql(partCol.getName()));
}
List<String> conjuncts = new ArrayList<>();
for (int i = 0; i < partColSql.size(); ++i) {
LiteralExpr partVal = part.getPartitionValues().get(i);
String partValSql = partVal.toSql();
if (Expr.IS_NULL_LITERAL.apply(partVal) || partValSql.isEmpty()) {
conjuncts.add(partColSql.get(i) + " IS NULL");
} else {
conjuncts.add(partColSql.get(i) + "=" + partValSql);
}
}
return "(" + Joiner.on(" AND " ).join(conjuncts) + ")";
}
/**
* Return the set of all file formats used in the collection of partitions.
*/
public static Set<HdfsFileFormat> getFileFormats(
Iterable<? extends FeFsPartition> partitions) {
Set<HdfsFileFormat> fileFormats = new HashSet<>();
for (FeFsPartition partition : partitions) {
fileFormats.add(partition.getFileFormat());
}
return fileFormats;
}
public static THdfsPartition fsPartitionToThrift(FeFsPartition part,
ThriftObjectType type) {
HdfsStorageDescriptor sd = part.getInputFormatDescriptor();
THdfsPartition thriftHdfsPart = new THdfsPartition(
sd.getLineDelim(),
sd.getFieldDelim(),
sd.getCollectionDelim(),
sd.getMapKeyDelim(),
sd.getEscapeChar(),
sd.getFileFormat().toThrift(),
Expr.treesToThrift(part.getPartitionValues()),
sd.getBlockSize());
thriftHdfsPart.setId(part.getId());
thriftHdfsPart.setLocation(part.getLocationAsThrift());
if (part.getWriteId() >= 0)
thriftHdfsPart.setWrite_id(part.getWriteId());
if (type == ThriftObjectType.FULL) {
thriftHdfsPart.setStats(new TTableStats(part.getNumRows()));
thriftHdfsPart.setAccess_level(part.getAccessLevel());
thriftHdfsPart.setIs_marked_cached(part.isMarkedCached());
// IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
// may try to serialize the returned THdfsPartition after releasing the table's lock,
// and another thread doing DDL may modify the map.
thriftHdfsPart.setHms_parameters(Maps.newHashMap(part.getParameters()));
thriftHdfsPart.setHas_incremental_stats(part.hasIncrementalStats());
// Add block location information
long numBlocks = 0;
long totalFileBytes = 0;
for (FileDescriptor fd: part.getFileDescriptors()) {
thriftHdfsPart.addToFile_desc(fd.toThrift());
numBlocks += fd.getNumFileBlocks();
totalFileBytes += fd.getFileLength();
}
thriftHdfsPart.setNum_blocks(numBlocks);
thriftHdfsPart.setTotal_file_size_bytes(totalFileBytes);
}
return thriftHdfsPart;
}
/**
* Populates cache metrics in the input TGetCatalogMetricsResult object.
* No-op if CatalogdMetaProvider is not the configured metadata provider.
*/
public static void populateCacheMetrics(
FeCatalog catalog, TGetCatalogMetricsResult metrics) {
Preconditions.checkNotNull(catalog);
Preconditions.checkNotNull(metrics);
// Populate cache stats only if configured in local mode.
if (!BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) return;
Preconditions.checkState(catalog instanceof LocalCatalog);
MetaProvider provider = ((LocalCatalog) catalog).getMetaProvider();
if (!(provider instanceof CatalogdMetaProvider)) return;
CacheStats stats = ((CatalogdMetaProvider) provider).getCacheStats();
metrics.setCache_eviction_count(stats.evictionCount());
metrics.setCache_hit_count(stats.hitCount());
metrics.setCache_load_count(stats.loadCount());
metrics.setCache_load_exception_count(stats.loadExceptionCount());
metrics.setCache_load_success_count(stats.loadSuccessCount());
metrics.setCache_miss_count(stats.missCount());
metrics.setCache_request_count(stats.requestCount());
metrics.setCache_total_load_time(stats.totalLoadTime());
metrics.setCache_avg_load_time(stats.averageLoadPenalty());
metrics.setCache_hit_rate(stats.hitRate());
metrics.setCache_load_exception_rate(stats.loadExceptionRate());
metrics.setCache_miss_rate(stats.missRate());
}
/**
* Returns a debug string for a given list of TCatalogObjects. Includes the unique key
* and version number for each object.
*/
public static String debugString(List<TCatalogObject> objects) {
if (objects == null || objects.size() == 0) return "[]";
List<String> catalogObjs = new ArrayList<>();
for (TCatalogObject object: objects) {
catalogObjs.add(String.format("%s version: %d",
Catalog.toCatalogObjectKey(object), object.catalog_version));
}
return "[" + Joiner.on(",").join(catalogObjs) + "]";
}
}