blob: bd17e355ca2fab78e2d110fcf90d4a3fa35e5f7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.metastore.store;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.MetadataException;
import org.apache.drill.exec.metastore.MetastoreMetadataProviderManager;
import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.parquet.ParquetTableMetadataUtils;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.metastore.components.tables.BasicTablesRequests;
import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
import org.apache.drill.metastore.metadata.BaseTableMetadata;
import org.apache.drill.metastore.metadata.FileMetadata;
import org.apache.drill.metastore.metadata.NonInterestingColumnsMetadata;
import org.apache.drill.metastore.metadata.PartitionMetadata;
import org.apache.drill.metastore.metadata.SegmentMetadata;
import org.apache.drill.metastore.metadata.TableInfo;
import org.apache.drill.metastore.metadata.TableMetadata;
import org.apache.drill.metastore.metadata.TableMetadataProvider;
import org.apache.drill.metastore.metadata.TableMetadataProviderBuilder;
import org.apache.drill.metastore.statistics.ColumnStatistics;
import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
import org.apache.drill.metastore.statistics.Statistic;
import org.apache.drill.metastore.statistics.StatisticsHolder;
import org.apache.drill.metastore.util.SchemaPathUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Implementation of {@link TableMetadataProvider} which uses Drill Metastore for providing table metadata
* for file-based tables.
*/
public class MetastoreFileTableMetadataProvider implements TableMetadataProvider {
private static final Logger logger = LoggerFactory.getLogger(MetastoreFileTableMetadataProvider.class);
protected final BasicTablesRequests basicTablesRequests;
protected final TableInfo tableInfo;
protected final MetastoreTableInfo metastoreTableInfo;
protected final TupleMetadata schema;
protected final List<String> paths;
protected final DrillStatsTable statsProvider;
protected final TableMetadataProviderBuilder fallbackBuilder;
protected final boolean useSchema;
protected final boolean useStatistics;
protected final boolean fallbackToFileMetadata;
protected BaseTableMetadata tableMetadata;
protected Map<Path, SegmentMetadata> segmentsMetadata;
protected List<PartitionMetadata> partitions;
protected Map<Path, FileMetadata> files;
private NonInterestingColumnsMetadata nonInterestingColumnsMetadata;
protected MetastoreFileTableMetadataProvider(Builder<?> builder) {
SchemaProvider schemaProvider = builder.metadataProviderManager.getSchemaProvider();
TupleMetadata schema = builder.schema;
// schema passed into the builder has greater priority
if (schema == null && schemaProvider != null) {
try {
schema = schemaProvider.read().getSchema();
} catch (IOException e) {
logger.warn("Unable to read schema from schema provider [{}]: {}.\n" +
"Query execution will continue without using the schema.",
builder.metadataProviderManager.getTableInfo().name(), e.getMessage());
logger.trace("Error when reading the schema", e);
}
}
this.basicTablesRequests = builder.metadataProviderManager.getMetastoreRegistry().get().tables().basicRequests();
this.tableInfo = builder.metadataProviderManager.getTableInfo();
this.metastoreTableInfo = basicTablesRequests.metastoreTableInfo(tableInfo);
this.useSchema = builder.metadataProviderManager.getConfig().useSchema();
this.useStatistics = builder.metadataProviderManager.getConfig().useStatistics();
this.fallbackToFileMetadata = builder.metadataProviderManager.getConfig().fallbackToFileMetadata();
this.schema = schema;
this.fallbackBuilder = builder.fallback;
this.statsProvider = builder.metadataProviderManager.getStatsProvider();
this.paths = builder.paths;
TableMetadataProvider source = builder.metadataProviderManager.getTableMetadataProvider();
// store results into FileSystemMetadataProviderManager to be able to use them when creating new instances
if (source == null || source.getFilesMetadataMap().size() < getFilesMetadataMap().size()) {
builder.metadataProviderManager.setTableMetadataProvider(this);
}
}
protected void throwIfChanged() {
if (basicTablesRequests.hasMetastoreTableInfoChanged(metastoreTableInfo)) {
throw MetadataException.of(MetadataException.MetadataExceptionType.INCONSISTENT_METADATA);
}
}
@Override
public TableMetadata getTableMetadata() {
throwIfChanged();
if (tableMetadata == null) {
if (schema == null) {
if (useSchema) {
tableMetadata = basicTablesRequests.tableMetadata(tableInfo);
} else {
throw MetadataException.of(MetadataException.MetadataExceptionType.ABSENT_SCHEMA);
}
} else {
tableMetadata = basicTablesRequests.tableMetadata(tableInfo).toBuilder()
.schema(schema)
.build();
}
if (!useStatistics) {
// removes statistics to prevent its usage later
tableMetadata = tableMetadata.toBuilder()
.columnsStatistics(Collections.emptyMap())
.build();
}
if (statsProvider != null) {
if (!statsProvider.isMaterialized()) {
statsProvider.materialize();
}
tableMetadata = tableMetadata.cloneWithStats(
ParquetTableMetadataUtils.getColumnStatistics(tableMetadata.getSchema(), statsProvider),
DrillStatsTable.getEstimatedTableStats(statsProvider));
}
}
return tableMetadata;
}
@Override
public List<SchemaPath> getPartitionColumns() {
throwIfChanged();
return basicTablesRequests.interestingColumnsAndPartitionKeys(tableInfo).partitionKeys().values().stream()
.map(SchemaPath::getSimplePath)
.collect(Collectors.toList());
}
@Override
public List<PartitionMetadata> getPartitionsMetadata() {
throwIfChanged();
if (partitions == null) {
partitions = basicTablesRequests.partitionsMetadata(tableInfo, null, null);
}
return partitions;
}
@Override
public List<PartitionMetadata> getPartitionMetadata(SchemaPath columnName) {
throwIfChanged();
return basicTablesRequests.partitionsMetadata(tableInfo, null, columnName.getRootSegmentPath());
}
@Override
public Map<Path, FileMetadata> getFilesMetadataMap() {
throwIfChanged();
if (files == null) {
files = basicTablesRequests.filesMetadata(tableInfo, null, paths).stream()
.collect(Collectors.toMap(FileMetadata::getPath, Function.identity()));
}
return files;
}
@Override
public Map<Path, SegmentMetadata> getSegmentsMetadataMap() {
throwIfChanged();
if (segmentsMetadata == null) {
segmentsMetadata = basicTablesRequests.segmentsMetadataByColumn(tableInfo, null, null).stream()
.collect(Collectors.toMap(SegmentMetadata::getPath, Function.identity()));
}
return segmentsMetadata;
}
@Override
public FileMetadata getFileMetadata(Path location) {
throwIfChanged();
return basicTablesRequests.fileMetadata(tableInfo, null, location.toUri().getPath());
}
@Override
public List<FileMetadata> getFilesForPartition(PartitionMetadata partition) {
throwIfChanged();
List<String> paths = partition.getLocations().stream()
.map(path -> path.toUri().getPath())
.collect(Collectors.toList());
return basicTablesRequests.filesMetadata(tableInfo, null, paths);
}
@Override
public NonInterestingColumnsMetadata getNonInterestingColumnsMetadata() {
throwIfChanged();
if (nonInterestingColumnsMetadata == null) {
TupleMetadata schema = getTableMetadata().getSchema();
List<StatisticsHolder<?>> statistics = Collections.singletonList(new StatisticsHolder<>(Statistic.NO_COLUMN_STATS, ColumnStatisticsKind.NULLS_COUNT));
List<SchemaPath> columnPaths = SchemaUtil.getSchemaPaths(schema);
List<SchemaPath> interestingColumns = getInterestingColumns(columnPaths);
// populates statistics for non-interesting columns and columns for which statistics wasn't collected
Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = columnPaths.stream()
.filter(schemaPath -> !interestingColumns.contains(schemaPath)
|| SchemaPathUtils.getColumnMetadata(schemaPath, schema).isArray())
.collect(Collectors.toMap(
Function.identity(),
schemaPath -> new ColumnStatistics<>(statistics, SchemaPathUtils.getColumnMetadata(schemaPath, schema).type())));
nonInterestingColumnsMetadata = new NonInterestingColumnsMetadata(columnsStatistics);
}
return nonInterestingColumnsMetadata;
}
@Override
public boolean checkMetadataVersion() {
return true;
}
private List<SchemaPath> getInterestingColumns(List<SchemaPath> columnPaths) {
if (useStatistics) {
return getTableMetadata().getInterestingColumns() == null
? columnPaths
: getTableMetadata().getInterestingColumns();
} else {
// if `metastore.metadata.use_statistics` is false, all columns are treated as non-interesting
return Collections.emptyList();
}
}
public static class Builder<T extends Builder<T>> implements FileTableMetadataProviderBuilder<T> {
protected final MetastoreMetadataProviderManager metadataProviderManager;
// builder for fallback ParquetFileTableMetadataProvider
// for the case when required metadata is absent in Metastore
protected final TableMetadataProviderBuilder fallback;
protected TupleMetadata schema;
protected List<String> paths;
private FileSelection selection;
private DrillFileSystem fs;
public Builder(MetastoreMetadataProviderManager source) {
this(source, new SimpleFileTableMetadataProvider.Builder(FileSystemMetadataProviderManager.init()));
}
protected Builder(MetastoreMetadataProviderManager source, TableMetadataProviderBuilder fallback) {
this.metadataProviderManager = source;
this.fallback = fallback;
}
@Override
public T withSchema(TupleMetadata schema) {
this.schema = schema;
return self();
}
public T withSelection(FileSelection selection) {
this.selection = selection;
return self();
}
public T withFileSystem(DrillFileSystem fs) {
this.fs = fs;
return self();
}
protected T self() {
return (T) this;
}
public MetastoreMetadataProviderManager metadataProviderManager() {
return metadataProviderManager;
}
public FileSelection selection() {
return selection;
}
public DrillFileSystem fs() {
return fs;
}
@Override
public TableMetadataProvider build() throws IOException {
if (selection().isExpandedFully()) {
paths = selection.getFiles().stream()
.map(path -> Path.getPathWithoutSchemeAndAuthority(path).toUri().getPath())
.collect(Collectors.toList());
} else {
paths = DrillFileSystemUtil.listFiles(fs, selection.getSelectionRoot(), true).stream()
.map(fileStatus -> Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toUri().getPath())
.collect(Collectors.toList());
}
return new MetastoreFileTableMetadataProvider(this);
}
}
}