| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iceberg; |
| |
| import static org.apache.iceberg.expressions.Expressions.alwaysTrue; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.iceberg.avro.Avro; |
| import org.apache.iceberg.avro.AvroIterable; |
| import org.apache.iceberg.exceptions.RuntimeIOException; |
| import org.apache.iceberg.expressions.Evaluator; |
| import org.apache.iceberg.expressions.Expression; |
| import org.apache.iceberg.expressions.Expressions; |
| import org.apache.iceberg.expressions.InclusiveMetricsEvaluator; |
| import org.apache.iceberg.expressions.Projections; |
| import org.apache.iceberg.io.CloseableGroup; |
| import org.apache.iceberg.io.CloseableIterable; |
| import org.apache.iceberg.io.CloseableIterator; |
| import org.apache.iceberg.io.InputFile; |
| import org.apache.iceberg.metrics.ScanMetrics; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; |
| import org.apache.iceberg.types.Types; |
| import org.apache.iceberg.util.PartitionSet; |
| |
| /** |
| * Base reader for data and delete manifest files. |
| * |
| * @param <F> The Java class of files returned by this reader. |
| */ |
| public class ManifestReader<F extends ContentFile<F>> extends CloseableGroup |
| implements CloseableIterable<F> { |
| static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*"); |
| |
| private static final Set<String> STATS_COLUMNS = |
| ImmutableSet.of( |
| "value_counts", |
| "null_value_counts", |
| "nan_value_counts", |
| "lower_bounds", |
| "upper_bounds", |
| "record_count"); |
| |
| protected enum FileType { |
| DATA_FILES(GenericDataFile.class.getName()), |
| DELETE_FILES(GenericDeleteFile.class.getName()); |
| |
| private final String fileClass; |
| |
| FileType(String fileClass) { |
| this.fileClass = fileClass; |
| } |
| |
| private String fileClass() { |
| return fileClass; |
| } |
| } |
| |
| private final InputFile file; |
| private final InheritableMetadata inheritableMetadata; |
| private final FileType content; |
| private final PartitionSpec spec; |
| private final Schema fileSchema; |
| |
| // updated by configuration methods |
| private PartitionSet partitionSet = null; |
| private Expression partFilter = alwaysTrue(); |
| private Expression rowFilter = alwaysTrue(); |
| private Schema fileProjection = null; |
| private Collection<String> columns = null; |
| private boolean caseSensitive = true; |
| private ScanMetrics scanMetrics = ScanMetrics.noop(); |
| |
| // lazily initialized |
| private Evaluator lazyEvaluator = null; |
| private InclusiveMetricsEvaluator lazyMetricsEvaluator = null; |
| |
| protected ManifestReader( |
| InputFile file, |
| int specId, |
| Map<Integer, PartitionSpec> specsById, |
| InheritableMetadata inheritableMetadata, |
| FileType content) { |
| this.file = file; |
| this.inheritableMetadata = inheritableMetadata; |
| this.content = content; |
| |
| if (specsById != null) { |
| this.spec = specsById.get(specId); |
| } else { |
| this.spec = readPartitionSpec(file); |
| } |
| |
| this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields()); |
| } |
| |
| private <T extends ContentFile<T>> PartitionSpec readPartitionSpec(InputFile inputFile) { |
| Map<String, String> metadata = readMetadata(inputFile); |
| |
| int specId = TableMetadata.INITIAL_SPEC_ID; |
| String specProperty = metadata.get("partition-spec-id"); |
| if (specProperty != null) { |
| specId = Integer.parseInt(specProperty); |
| } |
| |
| Schema schema = SchemaParser.fromJson(metadata.get("schema")); |
| return PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec")); |
| } |
| |
| private static <T extends ContentFile<T>> Map<String, String> readMetadata(InputFile inputFile) { |
| Map<String, String> metadata; |
| try { |
| try (AvroIterable<ManifestEntry<T>> headerReader = |
| Avro.read(inputFile) |
| .project(ManifestEntry.getSchema(Types.StructType.of()).select("status")) |
| .classLoader(GenericManifestEntry.class.getClassLoader()) |
| .build()) { |
| metadata = headerReader.getMetadata(); |
| } |
| } catch (IOException e) { |
| throw new RuntimeIOException(e); |
| } |
| return metadata; |
| } |
| |
| public boolean isDeleteManifestReader() { |
| return content == FileType.DELETE_FILES; |
| } |
| |
| public InputFile file() { |
| return file; |
| } |
| |
| public Schema schema() { |
| return fileSchema; |
| } |
| |
| public PartitionSpec spec() { |
| return spec; |
| } |
| |
| public ManifestReader<F> select(Collection<String> newColumns) { |
| Preconditions.checkState( |
| fileProjection == null, |
| "Cannot select columns using both select(String...) and project(Schema)"); |
| this.columns = newColumns; |
| return this; |
| } |
| |
| public ManifestReader<F> project(Schema newFileProjection) { |
| Preconditions.checkState( |
| columns == null, "Cannot select columns using both select(String...) and project(Schema)"); |
| this.fileProjection = newFileProjection; |
| return this; |
| } |
| |
| public ManifestReader<F> filterPartitions(Expression expr) { |
| this.partFilter = Expressions.and(partFilter, expr); |
| return this; |
| } |
| |
| public ManifestReader<F> filterPartitions(PartitionSet partitions) { |
| this.partitionSet = partitions; |
| return this; |
| } |
| |
| public ManifestReader<F> filterRows(Expression expr) { |
| this.rowFilter = Expressions.and(rowFilter, expr); |
| return this; |
| } |
| |
| public ManifestReader<F> caseSensitive(boolean isCaseSensitive) { |
| this.caseSensitive = isCaseSensitive; |
| return this; |
| } |
| |
| ManifestReader<F> scanMetrics(ScanMetrics newScanMetrics) { |
| this.scanMetrics = newScanMetrics; |
| return this; |
| } |
| |
| CloseableIterable<ManifestEntry<F>> entries() { |
| return entries(false /* all entries */); |
| } |
| |
| private CloseableIterable<ManifestEntry<F>> entries(boolean onlyLive) { |
| if (hasRowFilter() || hasPartitionFilter() || partitionSet != null) { |
| Evaluator evaluator = evaluator(); |
| InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator(); |
| |
| // ensure stats columns are present for metrics evaluation |
| boolean requireStatsProjection = requireStatsProjection(rowFilter, columns); |
| Collection<String> projectColumns = |
| requireStatsProjection ? withStatsColumns(columns) : columns; |
| CloseableIterable<ManifestEntry<F>> entries = |
| open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)); |
| |
| return CloseableIterable.filter( |
| content == FileType.DATA_FILES |
| ? scanMetrics.skippedDataFiles() |
| : scanMetrics.skippedDeleteFiles(), |
| onlyLive ? filterLiveEntries(entries) : entries, |
| entry -> |
| entry != null |
| && evaluator.eval(entry.file().partition()) |
| && metricsEvaluator.eval(entry.file()) |
| && inPartitionSet(entry.file())); |
| } else { |
| CloseableIterable<ManifestEntry<F>> entries = |
| open(projection(fileSchema, fileProjection, columns, caseSensitive)); |
| return onlyLive ? filterLiveEntries(entries) : entries; |
| } |
| } |
| |
| private boolean hasRowFilter() { |
| return rowFilter != null && rowFilter != Expressions.alwaysTrue(); |
| } |
| |
| private boolean hasPartitionFilter() { |
| return partFilter != null && partFilter != Expressions.alwaysTrue(); |
| } |
| |
| private boolean inPartitionSet(F fileToCheck) { |
| return partitionSet == null |
| || partitionSet.contains(fileToCheck.specId(), fileToCheck.partition()); |
| } |
| |
| private CloseableIterable<ManifestEntry<F>> open(Schema projection) { |
| FileFormat format = FileFormat.fromFileName(file.location()); |
| Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file); |
| |
| List<Types.NestedField> fields = Lists.newArrayList(); |
| fields.addAll(projection.asStruct().fields()); |
| fields.add(MetadataColumns.ROW_POSITION); |
| |
| switch (format) { |
| case AVRO: |
| AvroIterable<ManifestEntry<F>> reader = |
| Avro.read(file) |
| .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields))) |
| .rename("manifest_entry", GenericManifestEntry.class.getName()) |
| .rename("partition", PartitionData.class.getName()) |
| .rename("r102", PartitionData.class.getName()) |
| .rename("data_file", content.fileClass()) |
| .rename("r2", content.fileClass()) |
| .classLoader(GenericManifestEntry.class.getClassLoader()) |
| .reuseContainers() |
| .build(); |
| |
| addCloseable(reader); |
| |
| return CloseableIterable.transform(reader, inheritableMetadata::apply); |
| |
| default: |
| throw new UnsupportedOperationException("Invalid format for manifest file: " + format); |
| } |
| } |
| |
| CloseableIterable<ManifestEntry<F>> liveEntries() { |
| return entries(true /* only live entries */); |
| } |
| |
| private CloseableIterable<ManifestEntry<F>> filterLiveEntries( |
| CloseableIterable<ManifestEntry<F>> entries) { |
| return CloseableIterable.filter(entries, this::isLiveEntry); |
| } |
| |
| private boolean isLiveEntry(ManifestEntry<F> entry) { |
| return entry != null && entry.status() != ManifestEntry.Status.DELETED; |
| } |
| |
| /** @return an Iterator of DataFile. Makes defensive copies of files before returning */ |
| @Override |
| public CloseableIterator<F> iterator() { |
| boolean dropStats = dropStats(columns); |
| return CloseableIterable.transform(liveEntries(), e -> e.file().copy(!dropStats)).iterator(); |
| } |
| |
| private static Schema projection( |
| Schema schema, Schema project, Collection<String> columns, boolean caseSensitive) { |
| if (columns != null) { |
| if (caseSensitive) { |
| return schema.select(columns); |
| } else { |
| return schema.caseInsensitiveSelect(columns); |
| } |
| } else if (project != null) { |
| return project; |
| } |
| |
| return schema; |
| } |
| |
| private Evaluator evaluator() { |
| if (lazyEvaluator == null) { |
| Expression projected = Projections.inclusive(spec, caseSensitive).project(rowFilter); |
| Expression finalPartFilter = Expressions.and(projected, partFilter); |
| if (finalPartFilter != null) { |
| this.lazyEvaluator = new Evaluator(spec.partitionType(), finalPartFilter, caseSensitive); |
| } else { |
| this.lazyEvaluator = |
| new Evaluator(spec.partitionType(), Expressions.alwaysTrue(), caseSensitive); |
| } |
| } |
| return lazyEvaluator; |
| } |
| |
| private InclusiveMetricsEvaluator metricsEvaluator() { |
| if (lazyMetricsEvaluator == null) { |
| if (rowFilter != null) { |
| this.lazyMetricsEvaluator = |
| new InclusiveMetricsEvaluator(spec.schema(), rowFilter, caseSensitive); |
| } else { |
| this.lazyMetricsEvaluator = |
| new InclusiveMetricsEvaluator(spec.schema(), Expressions.alwaysTrue(), caseSensitive); |
| } |
| } |
| return lazyMetricsEvaluator; |
| } |
| |
| private static boolean requireStatsProjection(Expression rowFilter, Collection<String> columns) { |
| // Make sure we have all stats columns for metrics evaluator |
| return rowFilter != Expressions.alwaysTrue() |
| && columns != null |
| && !columns.containsAll(ManifestReader.ALL_COLUMNS) |
| && !columns.containsAll(STATS_COLUMNS); |
| } |
| |
| static boolean dropStats(Collection<String> columns) { |
| // Make sure we only drop all stats if we had projected all stats |
| // We do not drop stats even if we had partially added some stats columns, except for |
| // record_count column. |
| // Since we don't want to keep stats map which could be huge in size just because we select |
| // record_count, which |
| // is a primitive type. |
| if (columns != null && !columns.containsAll(ManifestReader.ALL_COLUMNS)) { |
| Set<String> intersection = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS); |
| return intersection.isEmpty() || intersection.equals(Sets.newHashSet("record_count")); |
| } |
| return false; |
| } |
| |
| static List<String> withStatsColumns(Collection<String> columns) { |
| if (columns.containsAll(ManifestReader.ALL_COLUMNS)) { |
| return Lists.newArrayList(columns); |
| } else { |
| List<String> projectColumns = Lists.newArrayList(columns); |
| projectColumns.addAll(STATS_COLUMNS); // order doesn't matter |
| return projectColumns; |
| } |
| } |
| } |