blob: 5729a04a4201ab965053a3292b9ef2f2aefe7899 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.apache.iceberg.types.Types.NestedField.optional;
public abstract class TestExpireSnapshotsAction extends SparkTestBase {
private static final HadoopTables TABLES = new HadoopTables(new Configuration());
private static final Schema SCHEMA = new Schema(
optional(1, "c1", Types.IntegerType.get()),
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get())
);
private static final int SHUFFLE_PARTITIONS = 2;
private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build();
static final DataFile FILE_A = DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=0") // easy way to set partition data for now
.withRecordCount(1)
.build();
static final DataFile FILE_B = DataFiles.builder(SPEC)
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=1") // easy way to set partition data for now
.withRecordCount(1)
.build();
static final DataFile FILE_C = DataFiles.builder(SPEC)
.withPath("/path/to/data-c.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=2") // easy way to set partition data for now
.withRecordCount(1)
.build();
static final DataFile FILE_D = DataFiles.builder(SPEC)
.withPath("/path/to/data-d.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=3") // easy way to set partition data for now
.withRecordCount(1)
.build();
@Rule
public TemporaryFolder temp = new TemporaryFolder();
private File tableDir;
private String tableLocation;
private Table table;
@Before
public void setupTableLocation() throws Exception {
this.tableDir = temp.newFolder();
this.tableLocation = tableDir.toURI().toString();
this.table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation);
spark.conf().set("spark.sql.shuffle.partitions", SHUFFLE_PARTITIONS);
}
private Long rightAfterSnapshot() {
return rightAfterSnapshot(table.currentSnapshot().snapshotId());
}
private Long rightAfterSnapshot(long snapshotId) {
Long end = System.currentTimeMillis();
while (end <= table.snapshot(snapshotId).timestampMillis()) {
end = System.currentTimeMillis();
}
return end;
}
private void checkExpirationResults(long expectedDatafiles, long expectedManifestsDeleted,
long expectedManifestListsDeleted, ExpireSnapshotsActionResult results) {
Assert.assertEquals("Incorrect number of manifest files deleted",
(Long) expectedManifestsDeleted, results.manifestFilesDeleted());
Assert.assertEquals("Incorrect number of datafiles deleted",
(Long) expectedDatafiles, results.dataFilesDeleted());
Assert.assertEquals("Incorrect number of manifest lists deleted",
(Long) expectedManifestListsDeleted, results.manifestListsDeleted());
}
@Test
public void testFilesCleaned() throws Exception {
table.newFastAppend()
.appendFile(FILE_A)
.commit();
table.newOverwrite()
.deleteFile(FILE_A)
.addFile(FILE_B)
.commit();
table.newFastAppend()
.appendFile(FILE_C)
.commit();
long end = rightAfterSnapshot();
ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute();
Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, Iterables.size(table.snapshots()));
checkExpirationResults(1L, 1L, 2L, results);
}
@Test
public void dataFilesCleanupWithParallelTasks() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
.commit();
table.newFastAppend()
.appendFile(FILE_B)
.commit();
table.newRewrite()
.rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D))
.commit();
table.newRewrite()
.rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C))
.commit();
long t4 = rightAfterSnapshot();
Set<String> deletedFiles = Sets.newHashSet();
Set<String> deleteThreads = ConcurrentHashMap.newKeySet();
AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.executeDeleteWith(Executors.newFixedThreadPool(4, runnable -> {
Thread thread = new Thread(runnable);
thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement());
thread.setDaemon(true); // daemon threads will be terminated abruptly when the JVM exits
return thread;
}))
.expireOlderThan(t4)
.deleteWith(s -> {
deleteThreads.add(Thread.currentThread().getName());
deletedFiles.add(s);
})
.execute();
// Verifies that the delete methods ran in the threads created by the provided ExecutorService ThreadFactory
Assert.assertEquals(deleteThreads,
Sets.newHashSet("remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3"));
Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString()));
checkExpirationResults(2L, 3L, 3L, result);
}
@Test
public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception {
table.newFastAppend()
.appendFile(FILE_A)
.commit();
ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().execute();
checkExpirationResults(0L, 0L, 0L, results);
}
@Test
public void testCleanupRepeatedOverwrites() throws Exception {
table.newFastAppend()
.appendFile(FILE_A)
.commit();
for (int i = 0; i < 10; i++) {
table.newOverwrite()
.deleteFile(FILE_A)
.addFile(FILE_B)
.commit();
table.newOverwrite()
.deleteFile(FILE_B)
.addFile(FILE_A)
.commit();
}
long end = rightAfterSnapshot();
ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute();
checkExpirationResults(1L, 39L, 20L, results);
}
@Test
public void testRetainLastWithExpireOlderThan() {
table.newAppend()
.appendFile(FILE_A) // data_bucket=0
.commit();
long firstSnapshotId = table.currentSnapshot().snapshotId();
long t1 = System.currentTimeMillis();
while (t1 <= table.currentSnapshot().timestampMillis()) {
t1 = System.currentTimeMillis();
}
table.newAppend()
.appendFile(FILE_B) // data_bucket=1
.commit();
table.newAppend()
.appendFile(FILE_C) // data_bucket=2
.commit();
long t3 = rightAfterSnapshot();
// Retain last 2 snapshots
Actions.forTable(table).expireSnapshots()
.expireOlderThan(t3)
.retainLast(2)
.execute();
Assert.assertEquals("Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size());
Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId));
}
@Test
public void testExpireTwoSnapshotsById() throws Exception {
table.newAppend()
.appendFile(FILE_A) // data_bucket=0
.commit();
long firstSnapshotId = table.currentSnapshot().snapshotId();
table.newAppend()
.appendFile(FILE_B) // data_bucket=1
.commit();
long secondSnapshotID = table.currentSnapshot().snapshotId();
table.newAppend()
.appendFile(FILE_C) // data_bucket=2
.commit();
// Retain last 2 snapshots
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireSnapshotId(firstSnapshotId)
.expireSnapshotId(secondSnapshotID)
.execute();
Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size());
Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId));
Assert.assertEquals("Second snapshot should not be present.", null, table.snapshot(secondSnapshotID));
checkExpirationResults(0L, 0L, 2L, result);
}
@Test
public void testRetainLastWithExpireById() {
table.newAppend()
.appendFile(FILE_A) // data_bucket=0
.commit();
long firstSnapshotId = table.currentSnapshot().snapshotId();
table.newAppend()
.appendFile(FILE_B) // data_bucket=1
.commit();
table.newAppend()
.appendFile(FILE_C) // data_bucket=2
.commit();
// Retain last 3 snapshots, but explicitly remove the first snapshot
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireSnapshotId(firstSnapshotId)
.retainLast(3)
.execute();
Assert.assertEquals("Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size());
Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId));
checkExpirationResults(0L, 0L, 1L, result);
}
@Test
public void testRetainLastWithTooFewSnapshots() {
table.newAppend()
.appendFile(FILE_A) // data_bucket=0
.appendFile(FILE_B) // data_bucket=1
.commit();
long firstSnapshotId = table.currentSnapshot().snapshotId();
table.newAppend()
.appendFile(FILE_C) // data_bucket=2
.commit();
long t2 = rightAfterSnapshot();
// Retain last 3 snapshots
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(t2)
.retainLast(3)
.execute();
Assert.assertEquals("Should have two snapshots", 2, Lists.newArrayList(table.snapshots()).size());
Assert.assertEquals("First snapshot should still present",
firstSnapshotId, table.snapshot(firstSnapshotId).snapshotId());
checkExpirationResults(0L, 0L, 0L, result);
}
@Test
public void testRetainLastKeepsExpiringSnapshot() {
table.newAppend()
.appendFile(FILE_A) // data_bucket=0
.commit();
table.newAppend()
.appendFile(FILE_B) // data_bucket=1
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
table.newAppend()
.appendFile(FILE_C) // data_bucket=2
.commit();
table.newAppend()
.appendFile(FILE_D) // data_bucket=3
.commit();
// Retain last 2 snapshots and expire older than t3
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(secondSnapshot.timestampMillis())
.retainLast(2)
.execute();
Assert.assertEquals("Should have three snapshots.", 3, Lists.newArrayList(table.snapshots()).size());
Assert.assertNotNull("Second snapshot should present.", table.snapshot(secondSnapshot.snapshotId()));
checkExpirationResults(0L, 0L, 1L, result);
}
@Test
public void testExpireSnapshotsWithDisabledGarbageCollection() {
table.updateProperties()
.set(TableProperties.GC_ENABLED, "false")
.commit();
table.newAppend()
.appendFile(FILE_A)
.commit();
Actions actions = Actions.forTable(table);
AssertHelpers.assertThrows("Should complain about expiring snapshots",
ValidationException.class, "Cannot expire snapshots: GC is disabled",
actions::expireSnapshots);
}
@Test
public void testExpireOlderThanMultipleCalls() {
table.newAppend()
.appendFile(FILE_A) // data_bucket=0
.commit();
table.newAppend()
.appendFile(FILE_B) // data_bucket=1
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
table.newAppend()
.appendFile(FILE_C) // data_bucket=2
.commit();
Snapshot thirdSnapshot = table.currentSnapshot();
// Retain last 2 snapshots and expire older than t3
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(secondSnapshot.timestampMillis())
.expireOlderThan(thirdSnapshot.timestampMillis())
.execute();
Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size());
Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId()));
checkExpirationResults(0L, 0L, 2L, result);
}
@Test
public void testRetainLastMultipleCalls() {
table.newAppend()
.appendFile(FILE_A) // data_bucket=0
.commit();
table.newAppend()
.appendFile(FILE_B) // data_bucket=1
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
table.newAppend()
.appendFile(FILE_C) // data_bucket=2
.commit();
long t3 = rightAfterSnapshot();
// Retain last 2 snapshots and expire older than t3
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(t3)
.retainLast(2)
.retainLast(1)
.execute();
Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size());
Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId()));
checkExpirationResults(0L, 0L, 2L, result);
}
@Test
public void testRetainZeroSnapshots() {
AssertHelpers.assertThrows("Should fail retain 0 snapshots " +
"because number of snapshots to retain cannot be zero",
IllegalArgumentException.class,
"Number of snapshots to retain must be at least 1, cannot be: 0",
() -> Actions.forTable(table).expireSnapshots().retainLast(0).execute());
}
@Test
public void testScanExpiredManifestInValidSnapshotAppend() {
table.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
table.newOverwrite()
.addFile(FILE_C)
.deleteFile(FILE_A)
.commit();
table.newAppend()
.appendFile(FILE_D)
.commit();
long t3 = rightAfterSnapshot();
Set<String> deletedFiles = Sets.newHashSet();
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(t3)
.deleteWith(deletedFiles::add)
.execute();
Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
checkExpirationResults(1L, 1L, 2L, result);
}
@Test
public void testScanExpiredManifestInValidSnapshotFastAppend() {
table.updateProperties()
.set(TableProperties.MANIFEST_MERGE_ENABLED, "true")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1")
.commit();
table.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
table.newOverwrite()
.addFile(FILE_C)
.deleteFile(FILE_A)
.commit();
table.newFastAppend()
.appendFile(FILE_D)
.commit();
long t3 = rightAfterSnapshot();
Set<String> deletedFiles = Sets.newHashSet();
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(t3)
.deleteWith(deletedFiles::add)
.execute();
Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
checkExpirationResults(1L, 1L, 2L, result);
}
/**
* Test on table below, and expiring the staged commit `B` using `expireOlderThan` API.
* Table: A - C
* ` B (staged)
*/
@Test
public void testWithExpiringDanglingStageCommit() {
// `A` commit
table.newAppend()
.appendFile(FILE_A)
.commit();
// `B` staged commit
table.newAppend()
.appendFile(FILE_B)
.stageOnly()
.commit();
TableMetadata base = ((BaseTable) table).operations().current();
Snapshot snapshotA = base.snapshots().get(0);
Snapshot snapshotB = base.snapshots().get(1);
// `C` commit
table.newAppend()
.appendFile(FILE_C)
.commit();
Set<String> deletedFiles = new HashSet<>();
// Expire all commits including dangling staged snapshot.
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.deleteWith(deletedFiles::add)
.expireOlderThan(snapshotB.timestampMillis() + 1)
.execute();
checkExpirationResults(1L, 1L, 2L, result);
Set<String> expectedDeletes = new HashSet<>();
expectedDeletes.add(snapshotA.manifestListLocation());
// Files should be deleted of dangling staged snapshot
snapshotB.addedFiles().forEach(i -> {
expectedDeletes.add(i.path().toString());
});
// ManifestList should be deleted too
expectedDeletes.add(snapshotB.manifestListLocation());
snapshotB.dataManifests().forEach(file -> {
// Only the manifest of B should be deleted.
if (file.snapshotId() == snapshotB.snapshotId()) {
expectedDeletes.add(file.path());
}
});
Assert.assertSame("Files deleted count should be expected", expectedDeletes.size(), deletedFiles.size());
// Take the diff
expectedDeletes.removeAll(deletedFiles);
Assert.assertTrue("Exactly same files should be deleted", expectedDeletes.isEmpty());
}
/**
* Expire cherry-pick the commit as shown below, when `B` is in table's current state
* Table:
* A - B - C <--current snapshot
* `- D (source=B)
*/
@Test
public void testWithCherryPickTableSnapshot() {
// `A` commit
table.newAppend()
.appendFile(FILE_A)
.commit();
Snapshot snapshotA = table.currentSnapshot();
// `B` commit
Set<String> deletedAFiles = new HashSet<>();
table.newOverwrite()
.addFile(FILE_B)
.deleteFile(FILE_A)
.deleteWith(deletedAFiles::add)
.commit();
Assert.assertTrue("No files should be physically deleted", deletedAFiles.isEmpty());
// pick the snapshot 'B`
Snapshot snapshotB = table.currentSnapshot();
// `C` commit to let cherry-pick take effect, and avoid fast-forward of `B` with cherry-pick
table.newAppend()
.appendFile(FILE_C)
.commit();
Snapshot snapshotC = table.currentSnapshot();
// Move the table back to `A`
table.manageSnapshots()
.setCurrentSnapshot(snapshotA.snapshotId())
.commit();
// Generate A -> `D (B)`
table.manageSnapshots()
.cherrypick(snapshotB.snapshotId())
.commit();
Snapshot snapshotD = table.currentSnapshot();
// Move the table back to `C`
table.manageSnapshots()
.setCurrentSnapshot(snapshotC.snapshotId())
.commit();
List<String> deletedFiles = new ArrayList<>();
// Expire `C`
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.deleteWith(deletedFiles::add)
.expireOlderThan(snapshotC.timestampMillis() + 1)
.execute();
// Make sure no dataFiles are deleted for the B, C, D snapshot
Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> {
i.addedFiles().forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
checkExpirationResults(1L, 2L, 2L, result);
}
/**
* Test on table below, and expiring `B` which is not in current table state.
* 1) Expire `B`
* 2) All commit
* Table: A - C - D (B)
* ` B (staged)
*/
@Test
public void testWithExpiringStagedThenCherrypick() {
// `A` commit
table.newAppend()
.appendFile(FILE_A)
.commit();
// `B` commit
table.newAppend()
.appendFile(FILE_B)
.stageOnly()
.commit();
// pick the snapshot that's staged but not committed
TableMetadata base = ((BaseTable) table).operations().current();
Snapshot snapshotB = base.snapshots().get(1);
// `C` commit to let cherry-pick take effect, and avoid fast-forward of `B` with cherry-pick
table.newAppend()
.appendFile(FILE_C)
.commit();
// `D (B)` cherry-pick commit
table.manageSnapshots()
.cherrypick(snapshotB.snapshotId())
.commit();
base = ((BaseTable) table).operations().current();
Snapshot snapshotD = base.snapshots().get(3);
List<String> deletedFiles = new ArrayList<>();
// Expire `B` commit.
ExpireSnapshotsActionResult firstResult = Actions.forTable(table).expireSnapshots()
.deleteWith(deletedFiles::add)
.expireSnapshotId(snapshotB.snapshotId())
.execute();
// Make sure no dataFiles are deleted for the staged snapshot
Lists.newArrayList(snapshotB).forEach(i -> {
i.addedFiles().forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
checkExpirationResults(0L, 1L, 1L, firstResult);
// Expire all snapshots including cherry-pick
ExpireSnapshotsActionResult secondResult = Actions.forTable(table).expireSnapshots()
.deleteWith(deletedFiles::add)
.expireOlderThan(table.currentSnapshot().timestampMillis() + 1)
.execute();
// Make sure no dataFiles are deleted for the staged and cherry-pick
Lists.newArrayList(snapshotB, snapshotD).forEach(i -> {
i.addedFiles().forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
checkExpirationResults(0L, 0L, 2L, secondResult);
}
@Test
public void testExpireOlderThan() {
table.newAppend()
.appendFile(FILE_A)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
rightAfterSnapshot();
table.newAppend()
.appendFile(FILE_B)
.commit();
long snapshotId = table.currentSnapshot().snapshotId();
long tAfterCommits = rightAfterSnapshot();
Set<String> deletedFiles = Sets.newHashSet();
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(tAfterCommits)
.deleteWith(deletedFiles::add)
.execute();
Assert.assertEquals("Expire should not change current snapshot", snapshotId, table.currentSnapshot().snapshotId());
Assert.assertNull("Expire should remove the oldest snapshot", table.snapshot(firstSnapshot.snapshotId()));
Assert.assertEquals("Should remove only the expired manifest list location",
Sets.newHashSet(firstSnapshot.manifestListLocation()), deletedFiles);
checkExpirationResults(0, 0, 1, result);
}
@Test
public void testExpireOlderThanWithDelete() {
table.newAppend()
.appendFile(FILE_A)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
1, firstSnapshot.allManifests().size());
rightAfterSnapshot();
table.newDelete()
.deleteFile(FILE_A)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create replace manifest with a rewritten manifest",
1, secondSnapshot.allManifests().size());
table.newAppend()
.appendFile(FILE_B)
.commit();
rightAfterSnapshot();
long snapshotId = table.currentSnapshot().snapshotId();
long tAfterCommits = rightAfterSnapshot();
Set<String> deletedFiles = Sets.newHashSet();
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(tAfterCommits)
.deleteWith(deletedFiles::add)
.execute();
Assert.assertEquals("Expire should not change current snapshot", snapshotId, table.currentSnapshot().snapshotId());
Assert.assertNull("Expire should remove the oldest snapshot", table.snapshot(firstSnapshot.snapshotId()));
Assert.assertNull("Expire should remove the second oldest snapshot", table.snapshot(secondSnapshot.snapshotId()));
Assert.assertEquals("Should remove expired manifest lists and deleted data file",
Sets.newHashSet(
firstSnapshot.manifestListLocation(), // snapshot expired
firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
secondSnapshot.manifestListLocation(), // snapshot expired
secondSnapshot.allManifests().get(0).path(), // manifest contained only deletes, was dropped
FILE_A.path()), // deleted
deletedFiles);
checkExpirationResults(1, 2, 2, result);
}
@Test
public void testExpireOlderThanWithDeleteInMergedManifests() {
// merge every commit
table.updateProperties()
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.commit();
table.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
1, firstSnapshot.allManifests().size());
rightAfterSnapshot();
table.newDelete()
.deleteFile(FILE_A) // FILE_B is still in the dataset
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
Assert.assertEquals("Should replace manifest with a rewritten manifest",
1, secondSnapshot.allManifests().size());
table.newFastAppend() // do not merge to keep the last snapshot's manifest valid
.appendFile(FILE_C)
.commit();
rightAfterSnapshot();
long snapshotId = table.currentSnapshot().snapshotId();
long tAfterCommits = rightAfterSnapshot();
Set<String> deletedFiles = Sets.newHashSet();
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(tAfterCommits)
.deleteWith(deletedFiles::add)
.execute();
Assert.assertEquals("Expire should not change current snapshot", snapshotId, table.currentSnapshot().snapshotId());
Assert.assertNull("Expire should remove the oldest snapshot", table.snapshot(firstSnapshot.snapshotId()));
Assert.assertNull("Expire should remove the second oldest snapshot", table.snapshot(secondSnapshot.snapshotId()));
Assert.assertEquals("Should remove expired manifest lists and deleted data file",
Sets.newHashSet(
firstSnapshot.manifestListLocation(), // snapshot expired
firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
secondSnapshot.manifestListLocation(), // snapshot expired
FILE_A.path()), // deleted
deletedFiles);
checkExpirationResults(1, 1, 2, result);
}
@Test
public void testExpireOlderThanWithRollback() {
// merge every commit
table.updateProperties()
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.commit();
table.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
1, firstSnapshot.allManifests().size());
rightAfterSnapshot();
table.newDelete()
.deleteFile(FILE_B)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size());
table.manageSnapshots()
.rollbackTo(firstSnapshot.snapshotId())
.commit();
long tAfterCommits = rightAfterSnapshot(secondSnapshot.snapshotId());
long snapshotId = table.currentSnapshot().snapshotId();
Set<String> deletedFiles = Sets.newHashSet();
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(tAfterCommits)
.deleteWith(deletedFiles::add)
.execute();
Assert.assertEquals("Expire should not change current snapshot", snapshotId, table.currentSnapshot().snapshotId());
Assert.assertNotNull("Expire should keep the oldest snapshot, current", table.snapshot(firstSnapshot.snapshotId()));
Assert.assertNull("Expire should remove the orphaned snapshot", table.snapshot(secondSnapshot.snapshotId()));
Assert.assertEquals("Should remove expired manifest lists and reverted appended data file",
Sets.newHashSet(
secondSnapshot.manifestListLocation(), // snapshot expired
Iterables.getOnlyElement(secondSnapshotManifests).path()), // manifest is no longer referenced
deletedFiles);
checkExpirationResults(0, 1, 1, result);
}
@Test
public void testExpireOlderThanWithRollbackAndMergedManifests() {
table.newAppend()
.appendFile(FILE_A)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
1, firstSnapshot.allManifests().size());
rightAfterSnapshot();
table.newAppend()
.appendFile(FILE_B)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size());
table.manageSnapshots()
.rollbackTo(firstSnapshot.snapshotId())
.commit();
long tAfterCommits = rightAfterSnapshot(secondSnapshot.snapshotId());
long snapshotId = table.currentSnapshot().snapshotId();
Set<String> deletedFiles = Sets.newHashSet();
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(tAfterCommits)
.deleteWith(deletedFiles::add)
.execute();
Assert.assertEquals("Expire should not change current snapshot", snapshotId, table.currentSnapshot().snapshotId());
Assert.assertNotNull("Expire should keep the oldest snapshot, current", table.snapshot(firstSnapshot.snapshotId()));
Assert.assertNull("Expire should remove the orphaned snapshot", table.snapshot(secondSnapshot.snapshotId()));
Assert.assertEquals("Should remove expired manifest lists and reverted appended data file",
Sets.newHashSet(
secondSnapshot.manifestListLocation(), // snapshot expired
Iterables.getOnlyElement(secondSnapshotManifests).path(), // manifest is no longer referenced
FILE_B.path()), // added, but rolled back
deletedFiles);
checkExpirationResults(1, 1, 1, result);
}
@Test
public void testExpireOnEmptyTable() {
Set<String> deletedFiles = Sets.newHashSet();
// table has no data, testing ExpireSnapshots should not fail with no snapshot
ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots()
.expireOlderThan(System.currentTimeMillis())
.deleteWith(deletedFiles::add)
.execute();
checkExpirationResults(0, 0, 0, result);
}
@Test
public void testExpireAction() {
table.newAppend()
.appendFile(FILE_A)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
rightAfterSnapshot();
table.newAppend()
.appendFile(FILE_B)
.commit();
long snapshotId = table.currentSnapshot().snapshotId();
long tAfterCommits = rightAfterSnapshot();
Set<String> deletedFiles = Sets.newHashSet();
ExpireSnapshotsAction action = Actions.forTable(table).expireSnapshots()
.expireOlderThan(tAfterCommits)
.deleteWith(deletedFiles::add);
Dataset<Row> pendingDeletes = action.expire();
List<Row> pending = pendingDeletes.collectAsList();
Assert.assertEquals("Should not change current snapshot", snapshotId, table.currentSnapshot().snapshotId());
Assert.assertNull("Should remove the oldest snapshot", table.snapshot(firstSnapshot.snapshotId()));
Assert.assertEquals("Pending deletes should contain one row", 1, pending.size());
Assert.assertEquals("Pending delete should be the expired manifest list location",
firstSnapshot.manifestListLocation(), pending.get(0).getString(0));
Assert.assertEquals("Pending delete should be a manifest list",
"Manifest List", pending.get(0).getString(1));
Assert.assertEquals("Should not delete any files", 0, deletedFiles.size());
Assert.assertSame("Multiple calls to expire should return the same deleted files",
pendingDeletes, action.expire());
}
@Test
public void testUseLocalIterator() {
table.newFastAppend()
.appendFile(FILE_A)
.commit();
table.newOverwrite()
.deleteFile(FILE_A)
.addFile(FILE_B)
.commit();
table.newFastAppend()
.appendFile(FILE_C)
.commit();
long end = rightAfterSnapshot();
int jobsBefore = spark.sparkContext().dagScheduler().nextJobId().get();
ExpireSnapshotsActionResult results =
Actions.forTable(table).expireSnapshots().expireOlderThan(end).streamDeleteResults(true).execute();
Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, Iterables.size(table.snapshots()));
int jobsAfter = spark.sparkContext().dagScheduler().nextJobId().get();
int totalJobsRun = jobsAfter - jobsBefore;
checkExpirationResults(1L, 1L, 2L, results);
Assert.assertTrue(
String.format("Expected more than %d jobs when using local iterator, ran %d", SHUFFLE_PARTITIONS, totalJobsRun),
totalJobsRun > SHUFFLE_PARTITIONS);
}
}