blob: a59ab001f5544173a794a7fc032c69ad2b3313f9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.local;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.ColumnStats;
import org.apache.impala.catalog.FeCatalogUtils;
import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.PrunablePartition;
import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.thrift.CatalogObjectsConstants;
import org.apache.impala.thrift.THdfsPartition;
import org.apache.impala.thrift.THdfsTable;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.AvroSchemaConverter;
import org.apache.impala.util.AvroSchemaUtils;
import org.apache.impala.util.ListMap;
import org.apache.thrift.TException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
public class LocalFsTable extends LocalTable implements FeFsTable {
/**
* Map from partition ID to partition spec.
*
* Set by loadPartitionSpecs().
*/
ImmutableMap<Long, LocalPartitionSpec> partitionSpecs_;
/**
* For each partition column, a map from value to the set of partition IDs
* having that value.
*
* Set by loadPartitionValueMap().
*/
private List<TreeMap<LiteralExpr, Set<Long>>> partitionValueMap_;
/**
* For each partition column, the set of partition IDs having a NULL value
* for that column.
*
* Set by loadPartitionValueMap().
*/
private List<Set<Long>> nullPartitionIds_;
/**
* The value that will be stored in a partition name to indicate NULL.
*/
private final String nullColumnValue_;
/**
* Map assigning integer indexes for the hosts containing blocks for this table.
* This is updated as a side effect of LocalFsPartition.loadFileDescriptors().
*/
private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<>();
/**
* The Avro schema for this table. Non-null if this table is an Avro table.
* If this table is not an Avro table, this is usually null, but may be
* non-null in the case that an explicit external avro schema is specified
* as a table property. Such a schema is used when querying Avro partitions
* of non-Avro tables.
*/
private final String avroSchema_;
public static LocalFsTable load(LocalDb db, Table msTbl, TableMetaRef ref) {
String fullName = msTbl.getDbName() + "." + msTbl.getTableName();
// Set Avro schema if necessary.
String avroSchema;
ColumnMap cmap;
try {
// Load the avro schema if it's external (explicitly specified).
avroSchema = loadAvroSchema(msTbl);
// If the table's format is Avro, then we should override the columns
// based on the schema (either inferred or explicit). Otherwise, even if
// there is an Avro schema set, we don't override the table-level columns:
// the Avro schema in that case is just used in case there is an Avro-formatted
// partition.
if (isAvroFormat(msTbl)) {
if (avroSchema == null) {
// No Avro schema was explicitly set in the table metadata, so infer the Avro
// schema from the column definitions.
Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas(
msTbl.getSd().getCols(), fullName);
avroSchema = inferredSchema.toString();
}
List<FieldSchema> reconciledFieldSchemas = AvroSchemaUtils.reconcileAvroSchema(
msTbl, avroSchema);
Table msTblWithExplicitAvroSchema = msTbl.deepCopy();
msTblWithExplicitAvroSchema.getSd().setCols(reconciledFieldSchemas);
cmap = ColumnMap.fromMsTable(msTblWithExplicitAvroSchema);
} else {
cmap = ColumnMap.fromMsTable(msTbl);
}
return new LocalFsTable(db, msTbl, ref, cmap, avroSchema);
} catch (AnalysisException e) {
throw new LocalCatalogException("Failed to load Avro schema for table "
+ fullName);
}
}
private LocalFsTable(LocalDb db, Table msTbl, TableMetaRef ref, ColumnMap cmap,
String explicitAvroSchema) {
super(db, msTbl, ref, cmap);
// set NULL indicator string from table properties
String tableNullFormat =
msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
nullColumnValue_ = tableNullFormat != null ? tableNullFormat :
FeFsTable.DEFAULT_NULL_COLUMN_VALUE;
avroSchema_ = explicitAvroSchema;
}
private static String loadAvroSchema(Table msTbl) throws AnalysisException {
List<Map<String, String>> schemaSearchLocations = ImmutableList.of(
msTbl.getSd().getSerdeInfo().getParameters(),
msTbl.getParameters());
// TODO(todd): we should consider moving this to the MetaProvider interface
// so that it can more easily be cached rather than re-loaded from HDFS on
// each table reference.
return AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
}
/**
* Creates a temporary FsTable object populated with the specified properties.
* This is used for CTAS statements.
*/
public static LocalFsTable createCtasTarget(LocalDb db,
Table msTbl) throws CatalogException {
return new LocalFsTable(db, msTbl, /*ref=*/null, ColumnMap.fromMsTable(msTbl),
/*explicitAvroSchema=*/null);
}
@Override
public boolean isCacheable() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isLocationCacheable() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isMarkedCached() {
// TODO Auto-generated method stub
return false;
}
@Override
public String getLocation() {
return getMetaStoreTable().getSd().getLocation();
}
@Override
public String getNullPartitionKeyValue() {
return db_.getCatalog().getNullPartitionKeyValue();
}
@Override
public String getHdfsBaseDir() {
// TODO(todd): this is redundant with getLocation, it seems.
return getLocation();
}
@Override
public long getTotalHdfsBytes() {
// TODO(todd): this is slow because it requires loading all partitions. Remove if possible.
long size = 0;
for (FeFsPartition p: loadPartitions(getPartitionIds())) {
size += p.getSize();
}
return size;
}
@Override
public boolean usesAvroSchemaOverride() {
return isAvroFormat(msTable_);
}
@Override
public Set<HdfsFileFormat> getFileFormats() {
// TODO(todd): can we avoid loading all partitions here? this is called
// for any INSERT query, even if the partition is specified.
Collection<? extends FeFsPartition> parts;
if (ref_ != null) {
parts = FeCatalogUtils.loadAllPartitions(this);
} else {
// If this is a CTAS target, we don't want to try to load the partition list.
parts = Collections.emptyList();
}
// In the case that we have no partitions added to the table yet, it's
// important to add the "prototype" partition as a fallback.
Iterable<FeFsPartition> partitionsToConsider = Iterables.concat(
parts, Collections.singleton(createPrototypePartition()));
return FeCatalogUtils.getFileFormats(partitionsToConsider);
}
@Override
public boolean hasWriteAccessToBaseDir() {
// TODO(todd): implement me properly
return true;
}
@Override
public String getFirstLocationWithoutWriteAccess() {
// TODO(todd): implement me properly
return null;
}
@Override
public TResultSet getTableStats() {
return HdfsTable.getTableStats(this);
}
@Override
public FileSystemUtil.FsType getFsType() {
Preconditions.checkNotNull(getHdfsBaseDir(),
"LocalTable base dir is null");
Path hdfsBaseDirPath = new Path(getHdfsBaseDir());
Preconditions.checkNotNull(hdfsBaseDirPath.toUri().getScheme(),
"Cannot get scheme from path " + getHdfsBaseDir());
return FileSystemUtil.FsType.getFsType(hdfsBaseDirPath.toUri().getScheme());
}
@Override
public TTableDescriptor toThriftDescriptor(int tableId,
Set<Long> referencedPartitions) {
if (referencedPartitions == null) {
// null means "all partitions".
referencedPartitions = getPartitionIds();
}
Map<Long, THdfsPartition> idToPartition = new HashMap<>();
List<? extends FeFsPartition> partitions = loadPartitions(referencedPartitions);
for (FeFsPartition partition : partitions) {
idToPartition.put(partition.getId(),
FeCatalogUtils.fsPartitionToThrift(partition,
ThriftObjectType.DESCRIPTOR_ONLY));
}
THdfsPartition tPrototypePartition = FeCatalogUtils.fsPartitionToThrift(
createPrototypePartition(), ThriftObjectType.DESCRIPTOR_ONLY);
THdfsTable hdfsTable = new THdfsTable(getHdfsBaseDir(), getColumnNames(),
getNullPartitionKeyValue(), nullColumnValue_, idToPartition,
tPrototypePartition);
if (avroSchema_ != null) {
hdfsTable.setAvroSchema(avroSchema_);
} else if (hasAnyAvroPartition(partitions)) {
// Need to infer an Avro schema for the backend to use if any of the
// referenced partitions are Avro, even if the table is mixed-format.
hdfsTable.setAvroSchema(AvroSchemaConverter.convertFieldSchemas(
getMetaStoreTable().getSd().getCols(), getFullName()).toString());
}
TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
FeCatalogUtils.getTColumnDescriptors(this),
getNumClusteringCols(), name_, db_.getName());
tableDesc.setHdfsTable(hdfsTable);
return tableDesc;
}
private static boolean isAvroFormat(Table msTbl) {
String inputFormat = msTbl.getSd().getInputFormat();
return HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO;
}
private static boolean hasAnyAvroPartition(List<? extends FeFsPartition> partitions) {
for (FeFsPartition p : partitions) {
if (p.getFileFormat() == HdfsFileFormat.AVRO) return true;
}
return false;
}
private LocalFsPartition createPrototypePartition() {
Partition protoMsPartition = new Partition();
// The prototype partition should not have a location set in its storage
// descriptor, or else all inserted files will end up written into the
// table directory instead of the new partition directories.
StorageDescriptor sd = getMetaStoreTable().getSd().deepCopy();
sd.unsetLocation();
protoMsPartition.setSd(sd);
protoMsPartition.setParameters(Collections.<String, String>emptyMap());
LocalPartitionSpec spec = new LocalPartitionSpec(
this, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
LocalFsPartition prototypePartition = new LocalFsPartition(
this, spec, protoMsPartition, /*fileDescriptors=*/null, /*partitionStats=*/null,
/*hasIncrementalStats=*/false);
return prototypePartition;
}
@Override
public Collection<? extends PrunablePartition> getPartitions() {
loadPartitionSpecs();
return partitionSpecs_.values();
}
@Override
public Set<Long> getPartitionIds() {
loadPartitionSpecs();
return partitionSpecs_.keySet();
}
@Override
public Map<Long, ? extends PrunablePartition> getPartitionMap() {
loadPartitionSpecs();
return partitionSpecs_;
}
@Override
public TreeMap<LiteralExpr, Set<Long>> getPartitionValueMap(int col) {
loadPartitionValueMap();
return partitionValueMap_.get(col);
}
@Override
public Set<Long> getNullPartitionIds(int colIdx) {
loadPartitionValueMap();
return nullPartitionIds_.get(colIdx);
}
@Override
public List<? extends FeFsPartition> loadPartitions(Collection<Long> ids) {
// TODO(todd) it seems like some queries actually call this multiple times.
// Perhaps we should store the result in this class, instead of relying on
// catalog-layer caching?
Preconditions.checkState(partitionSpecs_ != null,
"Cannot load partitions without having fetched partition IDs " +
"from the same LocalFsTable instance");
// Possible in the case that all partitions were pruned.
if (ids.isEmpty()) return Collections.emptyList();
List<PartitionRef> refs = new ArrayList<>();
for (Long id : ids) {
LocalPartitionSpec spec = partitionSpecs_.get(id);
Preconditions.checkArgument(spec != null, "Invalid partition ID for table %s: %s",
getFullName(), id);
refs.add(Preconditions.checkNotNull(spec.getRef()));
}
Map<String, PartitionMetadata> partsByName;
try {
partsByName = db_.getCatalog().getMetaProvider().loadPartitionsByRefs(
ref_, getClusteringColumnNames(), hostIndex_, refs);
} catch (TException e) {
throw new LocalCatalogException(
"Could not load partitions for table " + getFullName(), e);
}
List<FeFsPartition> ret = Lists.newArrayListWithCapacity(ids.size());
for (Long id : ids) {
LocalPartitionSpec spec = partitionSpecs_.get(id);
PartitionMetadata p = partsByName.get(spec.getRef().getName());
if (p == null) {
// TODO(todd): concurrent drop partition could result in this error.
// Should we recover in a more graceful way from such an unexpected event?
throw new LocalCatalogException(
"Could not load expected partitions for table " + getFullName() +
": missing expected partition with name '" + spec.getRef().getName() +
"' (perhaps it was concurrently dropped by another process)");
}
LocalFsPartition part = new LocalFsPartition(this, spec, p.getHmsPartition(),
p.getFileDescriptors(), p.getPartitionStats(), p.hasIncrementalStats());
ret.add(part);
}
return ret;
}
private List<String> getClusteringColumnNames() {
List<String> names = Lists.newArrayListWithCapacity(getNumClusteringCols());
for (Column c : getClusteringColumns()) {
names.add(c.getName());
}
return names;
}
private void loadPartitionValueMap() {
if (partitionValueMap_ != null) return;
loadPartitionSpecs();
List<TreeMap<LiteralExpr, Set<Long>>> valMapByCol =
new ArrayList<>();
List<Set<Long>> nullParts = new ArrayList<>();
for (int i = 0; i < getNumClusteringCols(); i++) {
valMapByCol.add(new TreeMap<>());
nullParts.add(new HashSet<>());
}
for (LocalPartitionSpec partition : partitionSpecs_.values()) {
List<LiteralExpr> vals = partition.getPartitionValues();
for (int i = 0; i < getNumClusteringCols(); i++) {
LiteralExpr val = vals.get(i);
if (Expr.IS_NULL_LITERAL.apply(val)) {
nullParts.get(i).add(partition.getId());
continue;
}
Set<Long> ids = valMapByCol.get(i).get(val);
if (ids == null) {
ids = new HashSet<>();
valMapByCol.get(i).put(val, ids);
}
ids.add(partition.getId());
}
}
partitionValueMap_ = valMapByCol;
nullPartitionIds_ = nullParts;
}
private void loadPartitionSpecs() {
if (partitionSpecs_ != null) return;
if (ref_ == null) {
// This is a CTAS target. Don't try to load metadata.
partitionSpecs_ = ImmutableMap.of();
return;
}
List<PartitionRef> partList;
try {
partList = db_.getCatalog().getMetaProvider().loadPartitionList(ref_);
} catch (TException e) {
throw new LocalCatalogException("Could not load partition names for table " +
getFullName(), e);
}
ImmutableMap.Builder<Long, LocalPartitionSpec> b = new ImmutableMap.Builder<>();
long id = 0;
for (PartitionRef part: partList) {
b.put(id, new LocalPartitionSpec(this, part, id));
id++;
}
partitionSpecs_ = b.build();
}
/**
* Override base implementation to populate column stats for
* clustering columns based on the partition map.
*/
@Override
protected void loadColumnStats() {
super.loadColumnStats();
// TODO(todd): this is called for all tables even if not necessary,
// which means we need to load all partition names, even if not
// necessary.
loadPartitionValueMap();
for (int i = 0; i < getNumClusteringCols(); i++) {
ColumnStats stats = getColumns().get(i).getStats();
int nonNullParts = partitionValueMap_.get(i).size();
int nullParts = nullPartitionIds_.get(i).size();
stats.setNumDistinctValues(nonNullParts + (nullParts > 0 ? 1 : 0));
// TODO(todd): this calculation ends up setting the num_nulls stat
// to the number of partitions with null rows, not the number of rows.
// However, it maintains the existing behavior from HdfsTable.
stats.setNumNulls(nullParts);
}
}
@Override
public ListMap<TNetworkAddress> getHostIndex() {
return hostIndex_;
}
@Override
public List<SQLPrimaryKey> getPrimaryKeys() {
return null;
}
@Override
public List<SQLForeignKey> getForeignKeys() {
return null;
}
}