blob: 91f54918dba8ddf02b0219c838c72bbeeb6eeb6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.metastore.store.parquet;
import org.apache.drill.exec.exception.MetadataException;
import org.apache.drill.exec.metastore.store.FileSystemMetadataProviderManager;
import org.apache.drill.exec.metastore.store.MetastoreFileTableMetadataProvider;
import org.apache.drill.exec.metastore.MetastoreMetadataProviderManager;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.metastore.metadata.RowGroupMetadata;
import org.apache.drill.metastore.metadata.TableMetadataProvider;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Implementation of {@link TableMetadataProvider} which uses Drill Metastore for providing
* table metadata for parquet tables.
*/
public class MetastoreParquetTableMetadataProvider extends MetastoreFileTableMetadataProvider
implements ParquetTableMetadataProvider {
private final List<ReadEntryWithPath> entries;
// stores builder to provide lazy init for fallback ParquetTableMetadataProvider
private final ParquetTableMetadataProviderBuilder<?> fallbackBuilder;
private Multimap<Path, RowGroupMetadata> rowGroups;
private ParquetMetadataProvider fallback;
private MetastoreParquetTableMetadataProvider(Builder builder) {
super(builder);
this.entries = builder.entries;
this.fallbackBuilder = builder.fallback;
ParquetMetadataProvider source = (ParquetTableMetadataProvider) builder.metadataProviderManager().getTableMetadataProvider();
// store results into metadataProviderManager to be able to use them when creating new instances
// for the case when source wasn't provided or it contains less row group metadata than the provider
if (source == null || source.getRowGroupsMeta().size() < getRowGroupsMeta().size()) {
builder.metadataProviderManager().setTableMetadataProvider(this);
}
}
@Override
public boolean isUsedMetadataCache() {
return false;
}
@Override
public Path getSelectionRoot() {
return getTableMetadata().getLocation();
}
@Override
public List<ReadEntryWithPath> getEntries() {
return entries;
}
@Override
public List<RowGroupMetadata> getRowGroupsMeta() {
return new ArrayList<>(getRowGroupsMetadataMap().values());
}
@Override
public List<Path> getLocations() {
return new ArrayList<>(getFilesMetadataMap().keySet());
}
@Override
public Multimap<Path, RowGroupMetadata> getRowGroupsMetadataMap() {
throwIfChanged();
if (rowGroups == null) {
rowGroups = LinkedListMultimap.create();
basicTablesRequests.rowGroupsMetadata(tableInfo, null, paths).stream()
.collect(Collectors.groupingBy(RowGroupMetadata::getPath, Collectors.toList()))
.forEach((path, rowGroupMetadata) -> rowGroups.putAll(path, rowGroupMetadata));
if (rowGroups.isEmpty()) {
if (fallbackToFileMetadata) {
try {
rowGroups = getFallbackTableMetadataProvider().getRowGroupsMetadataMap();
} catch (IOException e) {
throw MetadataException.of(MetadataException.MetadataExceptionType.FALLBACK_EXCEPTION, e);
}
} else {
throw MetadataException.of(MetadataException.MetadataExceptionType.INCOMPLETE_METADATA);
}
}
}
return rowGroups;
}
@Override
public Set<Path> getFileSet() {
throwIfChanged();
return getFilesMetadataMap().keySet();
}
private ParquetMetadataProvider getFallbackTableMetadataProvider() throws IOException {
if (fallback == null) {
fallback = fallbackBuilder == null ? null : fallbackBuilder.build();
}
return fallback;
}
public static class Builder extends MetastoreFileTableMetadataProvider.Builder<Builder>
implements ParquetTableMetadataProviderBuilder<Builder> {
private final ParquetTableMetadataProviderBuilder<?> fallback;
private List<ReadEntryWithPath> entries;
public Builder(MetastoreMetadataProviderManager source) {
super(source, new ParquetTableMetadataProviderImpl.Builder(FileSystemMetadataProviderManager.init()));
this.fallback = (ParquetTableMetadataProviderBuilder<?>) super.fallback;
}
@Override
public Builder withEntries(List<ReadEntryWithPath> entries) {
this.entries = entries;
fallback.withEntries(entries);
return this;
}
@Override
public Builder withSelectionRoot(Path selectionRoot) {
fallback.withSelectionRoot(selectionRoot);
return this;
}
@Override
public Builder withCacheFileRoot(Path cacheFileRoot) {
fallback.withCacheFileRoot(cacheFileRoot);
return this;
}
@Override
public Builder withReaderConfig(ParquetReaderConfig readerConfig) {
fallback.withReaderConfig(readerConfig);
return this;
}
@Override
public Builder withFileSystem(DrillFileSystem fs) {
fallback.withFileSystem(fs);
return super.withFileSystem(fs);
}
@Override
public Builder withCorrectCorruptedDates(boolean autoCorrectCorruptedDates) {
fallback.withCorrectCorruptedDates(autoCorrectCorruptedDates);
return this;
}
@Override
public Builder withSelection(FileSelection selection) {
fallback.withSelection(selection);
return super.withSelection(selection);
}
@Override
public Builder withSchema(TupleMetadata schema) {
fallback.withSchema(schema);
return super.withSchema(schema);
}
@Override
public Builder self() {
return this;
}
@Override
public ParquetTableMetadataProvider build() throws IOException {
if (entries == null) {
if (!selection().isExpandedFully()) {
entries = DrillFileSystemUtil.listFiles(fs(), selection().getSelectionRoot(), true).stream()
.map(fileStatus -> new ReadEntryWithPath(Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath())))
.collect(Collectors.toList());
} else {
entries = selection().getFiles().stream()
.map(Path::getPathWithoutSchemeAndAuthority)
.map(ReadEntryWithPath::new)
.collect(Collectors.toList());
}
}
paths = entries.stream()
.map(readEntryWithPath -> readEntryWithPath.getPath().toUri().getPath())
.collect(Collectors.toList());
return new MetastoreParquetTableMetadataProvider(this);
}
}
}