blob: 9e811f4969f5ebe5ae1817606bde1815deceb305 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.IOException;
import java.util.stream.StreamSupport;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
@RunWith(Parameterized.class)
public class TestMetadataTableScans extends TableTestBase {
@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] { 1, 2 };
}
public TestMetadataTableScans(int formatVersion) {
super(formatVersion);
}
private void preparePartitionedTable() {
table.newFastAppend()
.appendFile(FILE_PARTITION_0)
.commit();
table.newFastAppend()
.appendFile(FILE_PARTITION_1)
.commit();
table.newFastAppend()
.appendFile(FILE_PARTITION_2)
.commit();
table.newFastAppend()
.appendFile(FILE_PARTITION_3)
.commit();
}
@Test
public void testManifestsTableAlwaysIgnoresResiduals() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Table manifestsTable = new ManifestsTable(table.ops(), table);
TableScan scan = manifestsTable.newScan()
.filter(Expressions.lessThan("length", 10000L));
try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
for (FileScanTask task : tasks) {
Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual());
}
}
}
@Test
public void testDataFilesTableHonorsIgnoreResiduals() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
TableScan scan1 = dataFilesTable.newScan()
.filter(Expressions.equal("record_count", 1));
validateTaskScanResiduals(scan1, false);
TableScan scan2 = dataFilesTable.newScan()
.filter(Expressions.equal("record_count", 1))
.ignoreResiduals();
validateTaskScanResiduals(scan2, true);
}
@Test
public void testManifestEntriesTableHonorsIgnoreResiduals() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Table manifestEntriesTable = new ManifestEntriesTable(table.ops(), table);
TableScan scan1 = manifestEntriesTable.newScan()
.filter(Expressions.equal("snapshot_id", 1L));
validateTaskScanResiduals(scan1, false);
TableScan scan2 = manifestEntriesTable.newScan()
.filter(Expressions.equal("snapshot_id", 1L))
.ignoreResiduals();
validateTaskScanResiduals(scan2, true);
}
@Test
public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Table allDataFilesTable = new AllDataFilesTable(table.ops(), table);
TableScan scan1 = allDataFilesTable.newScan()
.filter(Expressions.equal("record_count", 1));
validateTaskScanResiduals(scan1, false);
TableScan scan2 = allDataFilesTable.newScan()
.filter(Expressions.equal("record_count", 1))
.ignoreResiduals();
validateTaskScanResiduals(scan2, true);
}
@Test
public void testAllEntriesTableHonorsIgnoreResiduals() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Table allEntriesTable = new AllEntriesTable(table.ops(), table);
TableScan scan1 = allEntriesTable.newScan()
.filter(Expressions.equal("snapshot_id", 1L));
validateTaskScanResiduals(scan1, false);
TableScan scan2 = allEntriesTable.newScan()
.filter(Expressions.equal("snapshot_id", 1L))
.ignoreResiduals();
validateTaskScanResiduals(scan2, true);
}
@Test
public void testAllManifestsTableHonorsIgnoreResiduals() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Table allManifestsTable = new AllManifestsTable(table.ops(), table);
TableScan scan1 = allManifestsTable.newScan()
.filter(Expressions.lessThan("length", 10000L));
validateTaskScanResiduals(scan1, false);
TableScan scan2 = allManifestsTable.newScan()
.filter(Expressions.lessThan("length", 10000L))
.ignoreResiduals();
validateTaskScanResiduals(scan2, true);
}
@Test
public void testPartitionsTableScanNoFilter() {
preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
Types.StructType expected = new Schema(
required(1, "partition", Types.StructType.of(
optional(1000, "data_bucket", Types.IntegerType.get())))).asStruct();
TableScan scanNoFilter = partitionsTable.newScan().select("partition.data_bucket");
Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
CloseableIterable<FileScanTask> tasksNoFilter = PartitionsTable.planFiles((StaticTableScan) scanNoFilter);
Assert.assertEquals(4, Iterators.size(tasksNoFilter.iterator()));
validateIncludesPartitionScan(tasksNoFilter, 0);
validateIncludesPartitionScan(tasksNoFilter, 1);
validateIncludesPartitionScan(tasksNoFilter, 2);
validateIncludesPartitionScan(tasksNoFilter, 3);
}
@Test
public void testPartitionsTableScanAndFilter() {
preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
Expression andEquals = Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
CloseableIterable<FileScanTask> tasksAndEq = PartitionsTable.planFiles((StaticTableScan) scanAndEq);
Assert.assertEquals(1, Iterators.size(tasksAndEq.iterator()));
validateIncludesPartitionScan(tasksAndEq, 0);
}
@Test
public void testPartitionsTableScanLtFilter() {
preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
Expression ltAnd = Expressions.and(
Expressions.lessThan("partition.data_bucket", 2),
Expressions.greaterThan("record_count", 0));
TableScan scanLtAnd = partitionsTable.newScan().filter(ltAnd);
CloseableIterable<FileScanTask> tasksLtAnd = PartitionsTable.planFiles((StaticTableScan) scanLtAnd);
Assert.assertEquals(2, Iterators.size(tasksLtAnd.iterator()));
validateIncludesPartitionScan(tasksLtAnd, 0);
validateIncludesPartitionScan(tasksLtAnd, 1);
}
@Test
public void testPartitionsTableScanOrFilter() {
preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
Expression or = Expressions.or(
Expressions.equal("partition.data_bucket", 2),
Expressions.greaterThan("record_count", 0));
TableScan scanOr = partitionsTable.newScan().filter(or);
CloseableIterable<FileScanTask> tasksOr = PartitionsTable.planFiles((StaticTableScan) scanOr);
Assert.assertEquals(4, Iterators.size(tasksOr.iterator()));
validateIncludesPartitionScan(tasksOr, 0);
validateIncludesPartitionScan(tasksOr, 1);
validateIncludesPartitionScan(tasksOr, 2);
validateIncludesPartitionScan(tasksOr, 3);
}
@Test
public void testPartitionsScanNotFilter() {
preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
TableScan scanNot = partitionsTable.newScan().filter(not);
CloseableIterable<FileScanTask> tasksNot = PartitionsTable.planFiles((StaticTableScan) scanNot);
Assert.assertEquals(2, Iterators.size(tasksNot.iterator()));
validateIncludesPartitionScan(tasksNot, 2);
validateIncludesPartitionScan(tasksNot, 3);
}
@Test
public void testPartitionsTableScanInFilter() {
preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
Expression set = Expressions.in("partition.data_bucket", 2, 3);
TableScan scanSet = partitionsTable.newScan().filter(set);
CloseableIterable<FileScanTask> tasksSet = PartitionsTable.planFiles((StaticTableScan) scanSet);
Assert.assertEquals(2, Iterators.size(tasksSet.iterator()));
validateIncludesPartitionScan(tasksSet, 2);
validateIncludesPartitionScan(tasksSet, 3);
}
@Test
public void testPartitionsTableScanNotNullFilter() {
preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
Expression unary = Expressions.notNull("partition.data_bucket");
TableScan scanUnary = partitionsTable.newScan().filter(unary);
CloseableIterable<FileScanTask> tasksUnary = PartitionsTable.planFiles((StaticTableScan) scanUnary);
Assert.assertEquals(4, Iterators.size(tasksUnary.iterator()));
validateIncludesPartitionScan(tasksUnary, 0);
validateIncludesPartitionScan(tasksUnary, 1);
validateIncludesPartitionScan(tasksUnary, 2);
validateIncludesPartitionScan(tasksUnary, 3);
}
@Test
public void testFilesTableScanNoFilter() {
preparePartitionedTable();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
Types.StructType expected = new Schema(
required(102, "partition", Types.StructType.of(
optional(1000, "data_bucket", Types.IntegerType.get())),
"Partition data tuple, schema based on the partition spec")).asStruct();
TableScan scanNoFilter = dataFilesTable.newScan().select("partition.data_bucket");
Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
CloseableIterable<FileScanTask> tasksAndEq = scanNoFilter.planFiles();
Assert.assertEquals(4, Iterables.size(tasksAndEq));
validateFileScanTasks(tasksAndEq, 0);
validateFileScanTasks(tasksAndEq, 1);
validateFileScanTasks(tasksAndEq, 2);
validateFileScanTasks(tasksAndEq, 3);
}
@Test
public void testFilesTableScanAndFilter() {
preparePartitionedTable();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
Expression andEquals = Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
CloseableIterable<FileScanTask> tasksAndEq = scanAndEq.planFiles();
Assert.assertEquals(1, Iterables.size(tasksAndEq));
validateFileScanTasks(tasksAndEq, 0);
}
@Test
public void testFilesTableScanAndFilterWithPlanTasks() {
preparePartitionedTable();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
Expression andEquals = Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
CloseableIterable<CombinedScanTask> tasksAndEq = scanAndEq.planTasks();
Assert.assertEquals(1, Iterables.size(tasksAndEq));
validateCombinedScanTasks(tasksAndEq, 0);
}
@Test
public void testFilesTableScanLtFilter() {
preparePartitionedTable();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
Expression lt = Expressions.lessThan("partition.data_bucket", 2);
TableScan scan = dataFilesTable.newScan().filter(lt);
CloseableIterable<FileScanTask> tasksLt = scan.planFiles();
Assert.assertEquals(2, Iterables.size(tasksLt));
validateFileScanTasks(tasksLt, 0);
validateFileScanTasks(tasksLt, 1);
}
@Test
public void testFilesTableScanOrFilter() {
preparePartitionedTable();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
Expression or = Expressions.or(
Expressions.equal("partition.data_bucket", 2),
Expressions.greaterThan("record_count", 0));
TableScan scan = dataFilesTable.newScan()
.filter(or);
CloseableIterable<FileScanTask> tasksOr = scan.planFiles();
Assert.assertEquals(4, Iterables.size(tasksOr));
validateFileScanTasks(tasksOr, 0);
validateFileScanTasks(tasksOr, 1);
validateFileScanTasks(tasksOr, 2);
validateFileScanTasks(tasksOr, 3);
}
@Test
public void testFilesScanNotFilter() {
preparePartitionedTable();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
TableScan scan = dataFilesTable.newScan()
.filter(not);
CloseableIterable<FileScanTask> tasksNot = scan.planFiles();
Assert.assertEquals(2, Iterables.size(tasksNot));
validateFileScanTasks(tasksNot, 2);
validateFileScanTasks(tasksNot, 3);
}
@Test
public void testFilesTableScanInFilter() {
preparePartitionedTable();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
Expression set = Expressions.in("partition.data_bucket", 2, 3);
TableScan scan = dataFilesTable.newScan()
.filter(set);
CloseableIterable<FileScanTask> tasksNot = scan.planFiles();
Assert.assertEquals(2, Iterables.size(tasksNot));
validateFileScanTasks(tasksNot, 2);
validateFileScanTasks(tasksNot, 3);
}
@Test
public void testFilesTableScanNotNullFilter() {
preparePartitionedTable();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
Expression unary = Expressions.notNull("partition.data_bucket");
TableScan scan = dataFilesTable.newScan()
.filter(unary);
CloseableIterable<FileScanTask> tasksUnary = scan.planFiles();
Assert.assertEquals(4, Iterables.size(tasksUnary));
validateFileScanTasks(tasksUnary, 0);
validateFileScanTasks(tasksUnary, 1);
validateFileScanTasks(tasksUnary, 2);
validateFileScanTasks(tasksUnary, 3);
}
@Test
public void testDataFilesTableSelection() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Table dataFilesTable = new DataFilesTable(table.ops(), table);
TableScan scan = dataFilesTable.newScan()
.filter(Expressions.equal("record_count", 1))
.select("content", "record_count");
validateTaskScanResiduals(scan, false);
Types.StructType expected = new Schema(
optional(134, "content", Types.IntegerType.get(),
"Contents of the file: 0=data, 1=position deletes, 2=equality deletes"),
required(103, "record_count", Types.LongType.get(), "Number of records in the file")
).asStruct();
Assert.assertEquals(expected, scan.schema().asStruct());
}
private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals) throws IOException {
try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
for (CombinedScanTask combinedScanTask : tasks) {
for (FileScanTask fileScanTask : combinedScanTask.files()) {
if (ignoreResiduals) {
Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
} else {
Assert.assertNotEquals("Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
}
}
}
}
}
private void validateIncludesPartitionScan(CloseableIterable<FileScanTask> tasks, int partValue) {
Assert.assertTrue("File scan tasks do not include correct file",
StreamSupport.stream(tasks.spliterator(), false).anyMatch(
a -> a.file().partition().get(0, Object.class).equals(partValue)));
}
private void validateFileScanTasks(CloseableIterable<FileScanTask> fileScanTasks, int partValue) {
Assert.assertTrue("File scan tasks do not include correct file",
StreamSupport.stream(fileScanTasks.spliterator(), false).anyMatch(t -> {
ManifestFile mf = ((DataFilesTable.ManifestReadTask) t).manifest();
return manifestHasPartition(mf, partValue);
}));
}
private void validateCombinedScanTasks(CloseableIterable<CombinedScanTask> tasks, int partValue) {
StreamSupport.stream(tasks.spliterator(), false)
.flatMap(c -> c.files().stream().map(t -> ((DataFilesTable.ManifestReadTask) t).manifest()))
.anyMatch(m -> manifestHasPartition(m, partValue));
}
private boolean manifestHasPartition(ManifestFile mf, int partValue) {
int lower = Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).lowerBound());
int upper = Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).upperBound());
return (lower <= partValue) && (upper >= partValue);
}
}