| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.List; |
| import org.apache.iceberg.expressions.Expressions; |
| import org.apache.iceberg.io.CloseableIterable; |
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| @RunWith(Parameterized.class) |
| public class TestIncrementalDataTableScan extends TableTestBase { |
| @Parameterized.Parameters |
| public static Object[][] parameters() { |
| return new Object[][] { |
| new Object[] { 1 }, |
| new Object[] { 2 }, |
| }; |
| } |
| |
| public TestIncrementalDataTableScan(int formatVersion) { |
| super(formatVersion); |
| } |
| |
| @Before |
| public void setupTableProperties() { |
| table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "3").commit(); |
| } |
| |
| @Test |
| public void testInvalidScans() { |
| add(table.newAppend(), files("A")); |
| AssertHelpers.assertThrows( |
| "from and to snapshots cannot be the same, since from snapshot is exclusive and not part of the scan", |
| IllegalArgumentException.class, "from and to snapshot ids cannot be the same", |
| () -> appendsBetweenScan(1, 1)); |
| |
| add(table.newAppend(), files("B")); |
| add(table.newAppend(), files("C")); |
| add(table.newAppend(), files("D")); |
| add(table.newAppend(), files("E")); |
| AssertHelpers.assertThrows( |
| "Check refinement api", |
| IllegalArgumentException.class, "from snapshot id 1 not in existing snapshot ids range (2, 4]", |
| () -> table.newScan().appendsBetween(2, 5).appendsBetween(1, 4)); |
| AssertHelpers.assertThrows( |
| "Check refinement api", |
| IllegalArgumentException.class, "to snapshot id 3 not in existing snapshot ids range (1, 2]", |
| () -> table.newScan().appendsBetween(1, 2).appendsBetween(1, 3)); |
| } |
| |
| @Test |
| public void testAppends() { |
| add(table.newAppend(), files("A")); // 1 |
| add(table.newAppend(), files("B")); |
| add(table.newAppend(), files("C")); |
| add(table.newAppend(), files("D")); |
| add(table.newAppend(), files("E")); // 5 |
| filesMatch(Lists.newArrayList("B", "C", "D", "E"), appendsBetweenScan(1, 5)); |
| filesMatch(Lists.newArrayList("C", "D", "E"), appendsBetweenScan(2, 5)); |
| } |
| |
| @Test |
| public void testReplaceOverwritesDeletes() { |
| add(table.newAppend(), files("A")); // 1 |
| add(table.newAppend(), files("B")); |
| add(table.newAppend(), files("C")); |
| add(table.newAppend(), files("D")); |
| add(table.newAppend(), files("E")); // 5 |
| filesMatch(Lists.newArrayList("B", "C", "D", "E"), appendsBetweenScan(1, 5)); |
| |
| replace(table.newRewrite(), files("A", "B", "C"), files("F", "G")); // 6 |
| // Replace commits are ignored |
| filesMatch(Lists.newArrayList("B", "C", "D", "E"), appendsBetweenScan(1, 6)); |
| filesMatch(Lists.newArrayList("E"), appendsBetweenScan(4, 6)); |
| // 6th snapshot is a replace. No new content is added |
| Assert.assertTrue("Replace commits are ignored", appendsBetweenScan(5, 6).isEmpty()); |
| delete(table.newDelete(), files("D")); // 7 |
| // 7th snapshot is a delete. |
| Assert.assertTrue("Replace and delete commits are ignored", appendsBetweenScan(5, 7).isEmpty()); |
| Assert.assertTrue("Delete commits are ignored", appendsBetweenScan(6, 7).isEmpty()); |
| add(table.newAppend(), files("I")); // 8 |
| // snapshots 6 and 7 are ignored |
| filesMatch(Lists.newArrayList("B", "C", "D", "E", "I"), appendsBetweenScan(1, 8)); |
| filesMatch(Lists.newArrayList("I"), appendsBetweenScan(6, 8)); |
| filesMatch(Lists.newArrayList("I"), appendsBetweenScan(7, 8)); |
| |
| overwrite(table.newOverwrite(), files("H"), files("E")); // 9 |
| AssertHelpers.assertThrows( |
| "Overwrites are not supported for Incremental scan", UnsupportedOperationException.class, |
| "Found overwrite operation, cannot support incremental data in snapshots (8, 9]", |
| () -> appendsBetweenScan(8, 9)); |
| } |
| |
| @Test |
| public void testTransactions() { |
| Transaction transaction = table.newTransaction(); |
| |
| add(transaction.newAppend(), files("A")); // 1 |
| add(transaction.newAppend(), files("B")); |
| add(transaction.newAppend(), files("C")); |
| add(transaction.newAppend(), files("D")); |
| add(transaction.newAppend(), files("E")); // 5 |
| transaction.commitTransaction(); |
| filesMatch(Lists.newArrayList("B", "C", "D", "E"), appendsBetweenScan(1, 5)); |
| |
| transaction = table.newTransaction(); |
| replace(transaction.newRewrite(), files("A", "B", "C"), files("F", "G")); // 6 |
| transaction.commitTransaction(); |
| // Replace commits are ignored |
| filesMatch(Lists.newArrayList("B", "C", "D", "E"), appendsBetweenScan(1, 6)); |
| filesMatch(Lists.newArrayList("E"), appendsBetweenScan(4, 6)); |
| // 6th snapshot is a replace. No new content is added |
| Assert.assertTrue("Replace commits are ignored", appendsBetweenScan(5, 6).isEmpty()); |
| |
| transaction = table.newTransaction(); |
| delete(transaction.newDelete(), files("D")); // 7 |
| transaction.commitTransaction(); |
| // 7th snapshot is a delete. |
| Assert.assertTrue("Replace and delete commits are ignored", appendsBetweenScan(5, 7).isEmpty()); |
| Assert.assertTrue("Delete commits are ignored", appendsBetweenScan(6, 7).isEmpty()); |
| |
| transaction = table.newTransaction(); |
| add(transaction.newAppend(), files("I")); // 8 |
| transaction.commitTransaction(); |
| // snapshots 6, 7 and 8 are ignored |
| filesMatch(Lists.newArrayList("B", "C", "D", "E", "I"), appendsBetweenScan(1, 8)); |
| filesMatch(Lists.newArrayList("I"), appendsBetweenScan(6, 8)); |
| filesMatch(Lists.newArrayList("I"), appendsBetweenScan(7, 8)); |
| } |
| |
| @Test |
| public void testRollbacks() { |
| add(table.newAppend(), files("A")); // 1 |
| add(table.newAppend(), files("B")); |
| add(table.newAppend(), files("C")); // 3 |
| // Go back to snapshot "B" |
| table.rollback().toSnapshotId(2).commit(); // 2 |
| Assert.assertEquals(2, table.currentSnapshot().snapshotId()); |
| filesMatch(Lists.newArrayList("B"), appendsBetweenScan(1, 2)); |
| filesMatch(Lists.newArrayList("B"), appendsAfterScan(1)); |
| |
| Transaction transaction = table.newTransaction(); |
| add(transaction.newAppend(), files("D")); // 4 |
| add(transaction.newAppend(), files("E")); // 5 |
| add(transaction.newAppend(), files("F")); |
| transaction.commitTransaction(); |
| // Go back to snapshot "E" |
| table.rollback().toSnapshotId(5).commit(); |
| Assert.assertEquals(5, table.currentSnapshot().snapshotId()); |
| filesMatch(Lists.newArrayList("B", "D", "E"), appendsBetweenScan(1, 5)); |
| filesMatch(Lists.newArrayList("B", "D", "E"), appendsAfterScan(1)); |
| } |
| |
| @Test |
| public void testIgnoreResiduals() throws IOException { |
| add(table.newAppend(), files("A")); |
| add(table.newAppend(), files("B")); |
| add(table.newAppend(), files("C")); |
| |
| TableScan scan1 = table.newScan() |
| .filter(Expressions.equal("id", 5)) |
| .appendsBetween(1, 3); |
| |
| try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) { |
| Assert.assertTrue("Tasks should not be empty", com.google.common.collect.Iterables.size(tasks) > 0); |
| for (CombinedScanTask combinedScanTask : tasks) { |
| for (FileScanTask fileScanTask : combinedScanTask.files()) { |
| Assert.assertNotEquals("Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual()); |
| } |
| } |
| } |
| |
| TableScan scan2 = table.newScan() |
| .filter(Expressions.equal("id", 5)) |
| .appendsBetween(1, 3) |
| .ignoreResiduals(); |
| |
| try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) { |
| Assert.assertTrue("Tasks should not be empty", com.google.common.collect.Iterables.size(tasks) > 0); |
| for (CombinedScanTask combinedScanTask : tasks) { |
| for (FileScanTask fileScanTask : combinedScanTask.files()) { |
| Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual()); |
| } |
| } |
| } |
| } |
| |
| private static DataFile file(String name) { |
| return DataFiles.builder(SPEC) |
| .withPath(name + ".parquet") |
| .withFileSizeInBytes(10) |
| .withPartitionPath("data_bucket=0") // easy way to set partition data for now |
| .withRecordCount(1) |
| .build(); |
| } |
| |
| private static void add(AppendFiles appendFiles, List<DataFile> adds) { |
| for (DataFile f : adds) { |
| appendFiles.appendFile(f); |
| } |
| appendFiles.commit(); |
| } |
| |
| private static void delete(DeleteFiles deleteFiles, List<DataFile> deletes) { |
| for (DataFile f : deletes) { |
| deleteFiles.deleteFile(f); |
| } |
| deleteFiles.commit(); |
| } |
| |
| private static void replace(RewriteFiles rewriteFiles, List<DataFile> deletes, List<DataFile> adds) { |
| rewriteFiles.rewriteFiles(Sets.newHashSet(deletes), Sets.newHashSet(adds)); |
| rewriteFiles.commit(); |
| } |
| |
| private static void overwrite(OverwriteFiles overwriteFiles, List<DataFile> adds, List<DataFile> deletes) { |
| for (DataFile f : adds) { |
| overwriteFiles.addFile(f); |
| } |
| for (DataFile f : deletes) { |
| overwriteFiles.deleteFile(f); |
| } |
| overwriteFiles.commit(); |
| } |
| |
| private static List<DataFile> files(String... names) { |
| return Lists.transform(Lists.newArrayList(names), TestIncrementalDataTableScan::file); |
| } |
| |
| private List<String> appendsAfterScan(long fromSnapshotId) { |
| final TableScan appendsAfter = table.newScan().appendsAfter(fromSnapshotId); |
| return filesToScan(appendsAfter); |
| } |
| |
| private List<String> appendsBetweenScan(long fromSnapshotId, long toSnapshotId) { |
| Snapshot s1 = table.snapshot(fromSnapshotId); |
| Snapshot s2 = table.snapshot(toSnapshotId); |
| TableScan appendsBetween = table.newScan().appendsBetween(s1.snapshotId(), s2.snapshotId()); |
| return filesToScan(appendsBetween); |
| } |
| |
| private static List<String> filesToScan(TableScan tableScan) { |
| Iterable<String> filesToRead = Iterables.transform(tableScan.planFiles(), t -> { |
| String path = t.file().path().toString(); |
| return path.split("\\.")[0]; |
| }); |
| return Lists.newArrayList(filesToRead); |
| } |
| |
| private static void filesMatch(List<String> expected, List<String> actual) { |
| Collections.sort(expected); |
| Collections.sort(actual); |
| Assert.assertEquals(expected, actual); |
| } |
| } |