blob: a9c6f50e0b36fcb8c810a72d4d9e83a852112e0b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;
/**
* A {@link Table} implementation that exposes a table's valid manifest files as rows.
*
* <p>A valid manifest file is one that is referenced from any snapshot currently tracked by the
* table.
*
* <p>This table may return duplicate rows.
*/
public class AllManifestsTable extends BaseMetadataTable {
public static final Types.NestedField REF_SNAPSHOT_ID =
Types.NestedField.required(18, "reference_snapshot_id", Types.LongType.get());
private static final Schema MANIFEST_FILE_SCHEMA =
new Schema(
Types.NestedField.required(14, "content", Types.IntegerType.get()),
Types.NestedField.required(1, "path", Types.StringType.get()),
Types.NestedField.required(2, "length", Types.LongType.get()),
Types.NestedField.optional(3, "partition_spec_id", Types.IntegerType.get()),
Types.NestedField.optional(4, "added_snapshot_id", Types.LongType.get()),
Types.NestedField.optional(5, "added_data_files_count", Types.IntegerType.get()),
Types.NestedField.optional(6, "existing_data_files_count", Types.IntegerType.get()),
Types.NestedField.optional(7, "deleted_data_files_count", Types.IntegerType.get()),
Types.NestedField.required(15, "added_delete_files_count", Types.IntegerType.get()),
Types.NestedField.required(16, "existing_delete_files_count", Types.IntegerType.get()),
Types.NestedField.required(17, "deleted_delete_files_count", Types.IntegerType.get()),
Types.NestedField.optional(
8,
"partition_summaries",
Types.ListType.ofRequired(
9,
Types.StructType.of(
Types.NestedField.required(10, "contains_null", Types.BooleanType.get()),
Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound", Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound", Types.StringType.get())))),
REF_SNAPSHOT_ID);
AllManifestsTable(Table table) {
this(table, table.name() + ".all_manifests");
}
AllManifestsTable(Table table, String name) {
super(table, name);
}
@Override
public TableScan newScan() {
return new AllManifestsTableScan(table(), MANIFEST_FILE_SCHEMA);
}
@Override
public Schema schema() {
return MANIFEST_FILE_SCHEMA;
}
@Override
MetadataTableType metadataTableType() {
return MetadataTableType.ALL_MANIFESTS;
}
public static class AllManifestsTableScan extends BaseAllMetadataTableScan {
AllManifestsTableScan(Table table, Schema fileSchema) {
super(table, fileSchema, MetadataTableType.ALL_MANIFESTS);
}
private AllManifestsTableScan(Table table, Schema schema, TableScanContext context) {
super(table, schema, MetadataTableType.ALL_MANIFESTS, context);
}
@Override
protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext context) {
return new AllManifestsTableScan(table, schema, context);
}
@Override
protected CloseableIterable<FileScanTask> doPlanFiles() {
FileIO io = table().io();
Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());
Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
SnapshotEvaluator snapshotEvaluator =
new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(), isCaseSensitive());
Iterable<Snapshot> filteredSnapshots =
Iterables.filter(table().snapshots(), snapshotEvaluator::eval);
return CloseableIterable.withNoopClose(
Iterables.transform(
filteredSnapshots,
snap -> {
if (snap.manifestListLocation() != null) {
return new ManifestListReadTask(
io, schema(), specs, snap.manifestListLocation(), filter, snap.snapshotId());
} else {
return StaticDataTask.of(
io.newInputFile(
((BaseTable) table()).operations().current().metadataFileLocation()),
MANIFEST_FILE_SCHEMA,
schema(),
snap.allManifests(io),
manifest ->
manifestFileToRow(
specs.get(manifest.partitionSpecId()), manifest, snap.snapshotId()));
}
}));
}
}
static class ManifestListReadTask implements DataTask {
private final FileIO io;
private final Schema schema;
private final Map<Integer, PartitionSpec> specs;
private final String manifestListLocation;
private final Expression residual;
private final long referenceSnapshotId;
private DataFile lazyDataFile = null;
ManifestListReadTask(
FileIO io,
Schema schema,
Map<Integer, PartitionSpec> specs,
String manifestListLocation,
Expression residual,
long referenceSnapshotId) {
this.io = io;
this.schema = schema;
this.specs = specs;
this.manifestListLocation = manifestListLocation;
this.residual = residual;
this.referenceSnapshotId = referenceSnapshotId;
}
@Override
public List<DeleteFile> deletes() {
return ImmutableList.of();
}
@Override
public CloseableIterable<StructLike> rows() {
try (CloseableIterable<ManifestFile> manifests =
Avro.read(io.newInputFile(manifestListLocation))
.rename("manifest_file", GenericManifestFile.class.getName())
.rename("partitions", GenericPartitionFieldSummary.class.getName())
.rename("r508", GenericPartitionFieldSummary.class.getName())
.project(ManifestFile.schema())
.classLoader(GenericManifestFile.class.getClassLoader())
.reuseContainers(false)
.build()) {
CloseableIterable<StructLike> rowIterable =
CloseableIterable.transform(
manifests,
manifest ->
manifestFileToRow(
specs.get(manifest.partitionSpecId()), manifest, referenceSnapshotId));
StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema);
return CloseableIterable.transform(rowIterable, projection::wrap);
} catch (IOException e) {
throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestListLocation);
}
}
@Override
public DataFile file() {
if (lazyDataFile == null) {
this.lazyDataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(io.newInputFile(manifestListLocation))
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
}
return lazyDataFile;
}
@Override
public PartitionSpec spec() {
return PartitionSpec.unpartitioned();
}
@Override
public long start() {
return 0;
}
@Override
public long length() {
// return a generic length to avoid looking up the actual length
return 8192;
}
@Override
public Expression residual() {
// this table is unpartitioned so the residual is always constant
return residual;
}
@Override
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}
}
static StaticDataTask.Row manifestFileToRow(
PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
return StaticDataTask.Row.of(
manifest.content().id(),
manifest.path(),
manifest.length(),
manifest.partitionSpecId(),
manifest.snapshotId(),
manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
referenceSnapshotId);
}
private static class SnapshotEvaluator {
private final Expression boundExpr;
private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
this.boundExpr = Binder.bind(structType, expr, caseSensitive);
}
private boolean eval(Snapshot snapshot) {
return new SnapshotEvalVisitor().eval(snapshot);
}
private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
private long snapshotId;
private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;
private boolean eval(Snapshot snapshot) {
this.snapshotId = snapshot.snapshotId();
return ExpressionVisitors.visitEvaluator(boundExpr, this);
}
@Override
public Boolean alwaysTrue() {
return ROWS_MIGHT_MATCH;
}
@Override
public Boolean alwaysFalse() {
return ROWS_CANNOT_MATCH;
}
@Override
public Boolean not(Boolean result) {
return !result;
}
@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return leftResult && rightResult;
}
@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return leftResult || rightResult;
}
@Override
public <T> Boolean isNull(BoundReference<T> ref) {
if (isSnapshotRef(ref)) {
return ROWS_CANNOT_MATCH; // reference_snapshot_id is never null
} else {
return ROWS_MIGHT_MATCH;
}
}
@Override
public <T> Boolean notNull(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}
@Override
public <T> Boolean isNaN(BoundReference<T> ref) {
if (isSnapshotRef(ref)) {
return ROWS_CANNOT_MATCH; // reference_snapshot_id is never nan
} else {
return ROWS_MIGHT_MATCH;
}
}
@Override
public <T> Boolean notNaN(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}
@Override
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
}
@Override
public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
}
@Override
public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
}
@Override
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
}
@Override
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
}
@Override
public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
}
@Override
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
if (isSnapshotRef(ref)) {
if (!literalSet.contains(snapshotId)) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}
@Override
public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
if (isSnapshotRef(ref)) {
if (literalSet.contains(snapshotId)) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}
@Override
public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}
@Override
public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}
/**
* Comparison of snapshot reference and literal, using long comparator.
*
* @param ref bound reference, comparison attempted only if reference is for
* reference_snapshot_id
* @param lit literal value to compare with snapshot id.
* @param desiredResult function to apply to long comparator result, returns true if result is
* as expected.
* @return false if comparator does not achieve desired result, true otherwise
*/
private <T> Boolean compareSnapshotRef(
BoundReference<T> ref, Literal<T> lit, Function<Integer, Boolean> desiredResult) {
if (isSnapshotRef(ref)) {
Literal<Long> longLit = lit.to(Types.LongType.get());
int cmp = longLit.comparator().compare(snapshotId, longLit.value());
if (!desiredResult.apply(cmp)) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}
private <T> boolean isSnapshotRef(BoundReference<T> ref) {
return ref.fieldId() == REF_SNAPSHOT_ID.fieldId();
}
}
}
}