| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iceberg; |
| |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.assertThatThrownBy; |
| import static org.assertj.core.api.Assumptions.assumeThat; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Collectors; |
| import org.apache.iceberg.ManifestEntry.Status; |
| import org.apache.iceberg.exceptions.ValidationException; |
| import org.apache.iceberg.io.FileIO; |
| import org.apache.iceberg.io.PositionOutputStream; |
| import org.apache.iceberg.puffin.Blob; |
| import org.apache.iceberg.puffin.Puffin; |
| import org.apache.iceberg.puffin.PuffinWriter; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; |
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; |
| import org.junit.jupiter.api.TestTemplate; |
| import org.junit.jupiter.api.extension.ExtendWith; |
| |
| @ExtendWith(ParameterizedTestExtension.class) |
| public class TestRemoveSnapshots extends TestBase { |
| @Parameter(index = 1) |
| private boolean incrementalCleanup; |
| |
| @Parameters(name = "formatVersion = {0}, incrementalCleanup = {1}") |
| protected static List<Object> parameters() { |
| return Arrays.asList( |
| new Object[] {1, true}, |
| new Object[] {2, true}, |
| new Object[] {1, false}, |
| new Object[] {2, false}); |
| } |
| |
| private long waitUntilAfter(long timestampMillis) { |
| long current = System.currentTimeMillis(); |
| while (current <= timestampMillis) { |
| current = System.currentTimeMillis(); |
| } |
| return current; |
| } |
| |
| @TestTemplate |
| public void testExpireOlderThan() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| Snapshot firstSnapshot = table.currentSnapshot(); |
| |
| waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| long snapshotId = table.currentSnapshot().snapshotId(); |
| |
| long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); |
| assertThat(table.snapshot(firstSnapshot.snapshotId())).isNull(); |
| assertThat(deletedFiles).containsExactly(firstSnapshot.manifestListLocation()); |
| } |
| |
| @TestTemplate |
| public void testExpireOlderThanWithDelete() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| Snapshot firstSnapshot = table.currentSnapshot(); |
| assertThat(firstSnapshot.allManifests(table.io())).hasSize(1); |
| |
| waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| table.newDelete().deleteFile(FILE_A).commit(); |
| |
| Snapshot secondSnapshot = table.currentSnapshot(); |
| assertThat(secondSnapshot.allManifests(table.io())) |
| .as("Should create replace manifest with a rewritten manifest") |
| .hasSize(1); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| long snapshotId = table.currentSnapshot().snapshotId(); |
| |
| long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); |
| assertThat(table.snapshot(firstSnapshot.snapshotId())).isNull(); |
| assertThat(table.snapshot(secondSnapshot.snapshotId())).isNull(); |
| |
| assertThat(deletedFiles) |
| .as("Should remove expired manifest lists and deleted data file") |
| .isEqualTo( |
| Sets.newHashSet( |
| firstSnapshot.manifestListLocation(), // snapshot expired |
| firstSnapshot |
| .allManifests(table.io()) |
| .get(0) |
| .path(), // manifest was rewritten for delete |
| secondSnapshot.manifestListLocation(), // snapshot expired |
| secondSnapshot |
| .allManifests(table.io()) |
| .get(0) |
| .path(), // manifest contained only deletes, was dropped |
| FILE_A.path() // deleted |
| )); |
| } |
| |
| @TestTemplate |
| public void testExpireOlderThanWithDeleteInMergedManifests() { |
| // merge every commit |
| table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0").commit(); |
| |
| table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); |
| |
| Snapshot firstSnapshot = table.currentSnapshot(); |
| assertThat(firstSnapshot.allManifests(table.io())).hasSize(1); |
| |
| waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| table |
| .newDelete() |
| .deleteFile(FILE_A) // FILE_B is still in the dataset |
| .commit(); |
| |
| Snapshot secondSnapshot = table.currentSnapshot(); |
| assertThat(secondSnapshot.allManifests(table.io())) |
| .as("Should replace manifest with a rewritten manifest") |
| .hasSize(1); |
| |
| table |
| .newFastAppend() // do not merge to keep the last snapshot's manifest valid |
| .appendFile(FILE_C) |
| .commit(); |
| |
| waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| long snapshotId = table.currentSnapshot().snapshotId(); |
| |
| long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); |
| assertThat(table.snapshot(firstSnapshot.snapshotId())).isNull(); |
| assertThat(table.snapshot(secondSnapshot.snapshotId())).isNull(); |
| |
| assertThat(deletedFiles) |
| .as("Should remove expired manifest lists and deleted data file") |
| .isEqualTo( |
| Sets.newHashSet( |
| firstSnapshot.manifestListLocation(), // snapshot expired |
| firstSnapshot |
| .allManifests(table.io()) |
| .get(0) |
| .path(), // manifest was rewritten for delete |
| secondSnapshot.manifestListLocation(), // snapshot expired |
| FILE_A.path() // deleted |
| )); |
| } |
| |
| @TestTemplate |
| public void testExpireOlderThanWithRollback() { |
| // merge every commit |
| table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0").commit(); |
| |
| table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); |
| |
| Snapshot firstSnapshot = table.currentSnapshot(); |
| assertThat(firstSnapshot.allManifests(table.io())).hasSize(1); |
| |
| waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| table.newDelete().deleteFile(FILE_B).commit(); |
| |
| Snapshot secondSnapshot = table.currentSnapshot(); |
| Set<ManifestFile> secondSnapshotManifests = |
| Sets.newHashSet(secondSnapshot.allManifests(table.io())); |
| secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io())); |
| assertThat(secondSnapshotManifests).hasSize(1); |
| |
| table.manageSnapshots().rollbackTo(firstSnapshot.snapshotId()).commit(); |
| |
| long tAfterCommits = waitUntilAfter(secondSnapshot.timestampMillis()); |
| |
| long snapshotId = table.currentSnapshot().snapshotId(); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); |
| assertThat(table.snapshot(firstSnapshot.snapshotId())) |
| .as("Expire should keep the oldest snapshot, current") |
| .isNotNull(); |
| assertThat(table.snapshot(secondSnapshot.snapshotId())) |
| .as("Expire should remove the orphaned snapshot") |
| .isNull(); |
| |
| assertThat(deletedFiles) |
| .as("Should remove expired manifest lists and reverted appended data file") |
| .isEqualTo( |
| Sets.newHashSet( |
| secondSnapshot.manifestListLocation(), // snapshot expired |
| Iterables.getOnlyElement(secondSnapshotManifests) |
| .path()) // manifest is no longer referenced |
| ); |
| } |
| |
| @TestTemplate |
| public void testExpireOlderThanWithRollbackAndMergedManifests() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| Snapshot firstSnapshot = table.currentSnapshot(); |
| assertThat(firstSnapshot.allManifests(table.io())).hasSize(1); |
| |
| waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| Snapshot secondSnapshot = table.currentSnapshot(); |
| Set<ManifestFile> secondSnapshotManifests = |
| Sets.newHashSet(secondSnapshot.allManifests(table.io())); |
| secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io())); |
| assertThat(secondSnapshotManifests).hasSize(1); |
| |
| table.manageSnapshots().rollbackTo(firstSnapshot.snapshotId()).commit(); |
| |
| long tAfterCommits = waitUntilAfter(secondSnapshot.timestampMillis()); |
| |
| long snapshotId = table.currentSnapshot().snapshotId(); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(table.currentSnapshot().snapshotId()).isEqualTo(snapshotId); |
| assertThat(table.snapshot(firstSnapshot.snapshotId())) |
| .as("Expire should keep the oldest snapshot, current") |
| .isNotNull(); |
| assertThat(table.snapshot(secondSnapshot.snapshotId())) |
| .as("Expire should remove the orphaned snapshot") |
| .isNull(); |
| |
| assertThat(deletedFiles) |
| .as("Should remove expired manifest lists and reverted appended data file") |
| .isEqualTo( |
| Sets.newHashSet( |
| secondSnapshot.manifestListLocation(), // snapshot expired |
| secondSnapshotManifests.stream() |
| .findFirst() |
| .get() |
| .path(), // manifest is no longer referenced |
| FILE_B.path()) // added, but rolled back |
| ); |
| } |
| |
| @TestTemplate |
| public void testRetainLastWithExpireOlderThan() { |
| long t0 = System.currentTimeMillis(); |
| table |
| .newAppend() |
| .appendFile(FILE_A) // data_bucket=0 |
| .commit(); |
| long firstSnapshotId = table.currentSnapshot().snapshotId(); |
| long t1 = System.currentTimeMillis(); |
| while (t1 <= table.currentSnapshot().timestampMillis()) { |
| t1 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_B) // data_bucket=1 |
| .commit(); |
| |
| long t2 = System.currentTimeMillis(); |
| while (t2 <= table.currentSnapshot().timestampMillis()) { |
| t2 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_C) // data_bucket=2 |
| .commit(); |
| |
| long t3 = System.currentTimeMillis(); |
| while (t3 <= table.currentSnapshot().timestampMillis()) { |
| t3 = System.currentTimeMillis(); |
| } |
| |
| // Retain last 2 snapshots |
| removeSnapshots(table).expireOlderThan(t3).retainLast(2).commit(); |
| |
| assertThat(table.snapshots()).hasSize(2); |
| assertThat(table.snapshot(firstSnapshotId)).isNull(); |
| } |
| |
| @TestTemplate |
| public void testRetainLastWithExpireById() { |
| long t0 = System.currentTimeMillis(); |
| table |
| .newAppend() |
| .appendFile(FILE_A) // data_bucket=0 |
| .commit(); |
| long firstSnapshotId = table.currentSnapshot().snapshotId(); |
| long t1 = System.currentTimeMillis(); |
| while (t1 <= table.currentSnapshot().timestampMillis()) { |
| t1 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_B) // data_bucket=1 |
| .commit(); |
| |
| long t2 = System.currentTimeMillis(); |
| while (t2 <= table.currentSnapshot().timestampMillis()) { |
| t2 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_C) // data_bucket=2 |
| .commit(); |
| |
| long t3 = System.currentTimeMillis(); |
| while (t3 <= table.currentSnapshot().timestampMillis()) { |
| t3 = System.currentTimeMillis(); |
| } |
| |
| // Retain last 3 snapshots, but explicitly remove the first snapshot |
| table.expireSnapshots().expireSnapshotId(firstSnapshotId).retainLast(3).commit(); |
| |
| assertThat(table.snapshots()).hasSize(2); |
| assertThat(table.snapshot(firstSnapshotId)).isNull(); |
| } |
| |
| @TestTemplate |
| public void testRetainNAvailableSnapshotsWithTransaction() { |
| long t0 = System.currentTimeMillis(); |
| table |
| .newAppend() |
| .appendFile(FILE_A) // data_bucket=0 |
| .commit(); |
| long firstSnapshotId = table.currentSnapshot().snapshotId(); |
| long t1 = System.currentTimeMillis(); |
| while (t1 <= table.currentSnapshot().timestampMillis()) { |
| t1 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_B) // data_bucket=1 |
| .commit(); |
| |
| long t2 = System.currentTimeMillis(); |
| while (t2 <= table.currentSnapshot().timestampMillis()) { |
| t2 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_C) // data_bucket=2 |
| .commit(); |
| |
| long t3 = System.currentTimeMillis(); |
| while (t3 <= table.currentSnapshot().timestampMillis()) { |
| t3 = System.currentTimeMillis(); |
| } |
| |
| assertThat(listManifestFiles(new File(table.location()))).hasSize(3); |
| |
| // Retain last 2 snapshots, which means 1 is deleted. |
| Transaction tx = table.newTransaction(); |
| removeSnapshots(tx.table()).expireOlderThan(t3).retainLast(2).commit(); |
| tx.commitTransaction(); |
| |
| assertThat(table.snapshots()).hasSize(2); |
| assertThat(table.snapshot(firstSnapshotId)).isNull(); |
| assertThat(listManifestLists(new File(table.location()))).hasSize(2); |
| } |
| |
| @TestTemplate |
| public void testRetainLastWithTooFewSnapshots() { |
| long t0 = System.currentTimeMillis(); |
| table |
| .newAppend() |
| .appendFile(FILE_A) // data_bucket=0 |
| .appendFile(FILE_B) // data_bucket=1 |
| .commit(); |
| long firstSnapshotId = table.currentSnapshot().snapshotId(); |
| |
| long t1 = System.currentTimeMillis(); |
| while (t1 <= table.currentSnapshot().timestampMillis()) { |
| t1 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_C) // data_bucket=2 |
| .commit(); |
| |
| long t2 = System.currentTimeMillis(); |
| while (t2 <= table.currentSnapshot().timestampMillis()) { |
| t2 = System.currentTimeMillis(); |
| } |
| |
| // Retain last 3 snapshots |
| removeSnapshots(table).expireOlderThan(t2).retainLast(3).commit(); |
| |
| assertThat(table.snapshots()).hasSize(2); |
| assertThat(table.snapshot(firstSnapshotId).snapshotId()).isEqualTo(firstSnapshotId); |
| } |
| |
| @TestTemplate |
| public void testRetainNLargerThanCurrentSnapshots() { |
| // Append 3 files |
| table |
| .newAppend() |
| .appendFile(FILE_A) // data_bucket=0 |
| .commit(); |
| long firstSnapshotId = table.currentSnapshot().snapshotId(); |
| long t1 = System.currentTimeMillis(); |
| while (t1 <= table.currentSnapshot().timestampMillis()) { |
| t1 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_B) // data_bucket=1 |
| .commit(); |
| |
| long t2 = System.currentTimeMillis(); |
| while (t2 <= table.currentSnapshot().timestampMillis()) { |
| t2 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_C) // data_bucket=2 |
| .commit(); |
| |
| long t3 = System.currentTimeMillis(); |
| while (t3 <= table.currentSnapshot().timestampMillis()) { |
| t3 = System.currentTimeMillis(); |
| } |
| |
| // Retain last 4 snapshots |
| Transaction tx = table.newTransaction(); |
| removeSnapshots(tx.table()).expireOlderThan(t3).retainLast(4).commit(); |
| tx.commitTransaction(); |
| |
| assertThat(table.snapshots()).hasSize(3); |
| } |
| |
| @TestTemplate |
| public void testRetainLastKeepsExpiringSnapshot() { |
| long t0 = System.currentTimeMillis(); |
| table |
| .newAppend() |
| .appendFile(FILE_A) // data_bucket=0 |
| .commit(); |
| long t1 = System.currentTimeMillis(); |
| while (t1 <= table.currentSnapshot().timestampMillis()) { |
| t1 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_B) // data_bucket=1 |
| .commit(); |
| |
| Snapshot secondSnapshot = table.currentSnapshot(); |
| long t2 = System.currentTimeMillis(); |
| while (t2 <= table.currentSnapshot().timestampMillis()) { |
| t2 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_C) // data_bucket=2 |
| .commit(); |
| |
| long t3 = System.currentTimeMillis(); |
| while (t3 <= table.currentSnapshot().timestampMillis()) { |
| t3 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_D) // data_bucket=3 |
| .commit(); |
| |
| long t4 = System.currentTimeMillis(); |
| while (t4 <= table.currentSnapshot().timestampMillis()) { |
| t4 = System.currentTimeMillis(); |
| } |
| |
| // Retain last 2 snapshots and expire older than t3 |
| removeSnapshots(table).expireOlderThan(secondSnapshot.timestampMillis()).retainLast(2).commit(); |
| |
| assertThat(table.snapshots()).hasSize(3); |
| assertThat(table.snapshot(secondSnapshot.snapshotId())).isNotNull(); |
| } |
| |
| @TestTemplate |
| public void testExpireOlderThanMultipleCalls() { |
| long t0 = System.currentTimeMillis(); |
| table |
| .newAppend() |
| .appendFile(FILE_A) // data_bucket=0 |
| .commit(); |
| long t1 = System.currentTimeMillis(); |
| while (t1 <= table.currentSnapshot().timestampMillis()) { |
| t1 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_B) // data_bucket=1 |
| .commit(); |
| |
| Snapshot secondSnapshot = table.currentSnapshot(); |
| long t2 = System.currentTimeMillis(); |
| while (t2 <= table.currentSnapshot().timestampMillis()) { |
| t2 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_C) // data_bucket=2 |
| .commit(); |
| |
| Snapshot thirdSnapshot = table.currentSnapshot(); |
| long t3 = System.currentTimeMillis(); |
| while (t3 <= table.currentSnapshot().timestampMillis()) { |
| t3 = System.currentTimeMillis(); |
| } |
| |
| // Retain last 2 snapshots and expire older than t3 |
| removeSnapshots(table) |
| .expireOlderThan(secondSnapshot.timestampMillis()) |
| .expireOlderThan(thirdSnapshot.timestampMillis()) |
| .commit(); |
| |
| assertThat(table.snapshots()).hasSize(1); |
| assertThat(table.snapshot(secondSnapshot.snapshotId())).isNull(); |
| } |
| |
| @TestTemplate |
| public void testRetainLastMultipleCalls() { |
| long t0 = System.currentTimeMillis(); |
| table |
| .newAppend() |
| .appendFile(FILE_A) // data_bucket=0 |
| .commit(); |
| long t1 = System.currentTimeMillis(); |
| while (t1 <= table.currentSnapshot().timestampMillis()) { |
| t1 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_B) // data_bucket=1 |
| .commit(); |
| |
| Snapshot secondSnapshot = table.currentSnapshot(); |
| long t2 = System.currentTimeMillis(); |
| while (t2 <= table.currentSnapshot().timestampMillis()) { |
| t2 = System.currentTimeMillis(); |
| } |
| |
| table |
| .newAppend() |
| .appendFile(FILE_C) // data_bucket=2 |
| .commit(); |
| |
| long t3 = System.currentTimeMillis(); |
| while (t3 <= table.currentSnapshot().timestampMillis()) { |
| t3 = System.currentTimeMillis(); |
| } |
| |
| // Retain last 2 snapshots and expire older than t3 |
| removeSnapshots(table).expireOlderThan(t3).retainLast(2).retainLast(1).commit(); |
| |
| assertThat(table.snapshots()).hasSize(1); |
| assertThat(table.snapshot(secondSnapshot.snapshotId())).isNull(); |
| } |
| |
| @TestTemplate |
| public void testRetainZeroSnapshots() { |
| assertThatThrownBy(() -> removeSnapshots(table).retainLast(0).commit()) |
| .isInstanceOf(IllegalArgumentException.class) |
| .hasMessage("Number of snapshots to retain must be at least 1, cannot be: 0"); |
| } |
| |
| @TestTemplate |
| public void testScanExpiredManifestInValidSnapshotAppend() { |
| table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); |
| |
| table.newOverwrite().addFile(FILE_C).deleteFile(FILE_A).commit(); |
| |
| table.newAppend().appendFile(FILE_D).commit(); |
| |
| long t3 = System.currentTimeMillis(); |
| while (t3 <= table.currentSnapshot().timestampMillis()) { |
| t3 = System.currentTimeMillis(); |
| } |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(deletedFiles).contains(FILE_A.path().toString()); |
| } |
| |
| @TestTemplate |
| public void testScanExpiredManifestInValidSnapshotFastAppend() { |
| table |
| .updateProperties() |
| .set(TableProperties.MANIFEST_MERGE_ENABLED, "true") |
| .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") |
| .commit(); |
| |
| table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); |
| |
| table.newOverwrite().addFile(FILE_C).deleteFile(FILE_A).commit(); |
| |
| table.newFastAppend().appendFile(FILE_D).commit(); |
| |
| long t3 = System.currentTimeMillis(); |
| while (t3 <= table.currentSnapshot().timestampMillis()) { |
| t3 = System.currentTimeMillis(); |
| } |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(deletedFiles).contains(FILE_A.path().toString()); |
| } |
| |
| @TestTemplate |
| public void dataFilesCleanup() throws IOException { |
| table.newFastAppend().appendFile(FILE_A).commit(); |
| |
| table.newFastAppend().appendFile(FILE_B).commit(); |
| |
| table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)).commit(); |
| long thirdSnapshotId = table.currentSnapshot().snapshotId(); |
| |
| table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)).commit(); |
| long fourthSnapshotId = table.currentSnapshot().snapshotId(); |
| |
| long t4 = System.currentTimeMillis(); |
| while (t4 <= table.currentSnapshot().timestampMillis()) { |
| t4 = System.currentTimeMillis(); |
| } |
| |
| List<ManifestFile> manifests = table.currentSnapshot().dataManifests(table.io()); |
| |
| ManifestFile newManifest = |
| writeManifest( |
| "manifest-file-1.avro", |
| manifestEntry(Status.EXISTING, thirdSnapshotId, FILE_C), |
| manifestEntry(Status.EXISTING, fourthSnapshotId, FILE_D)); |
| |
| RewriteManifests rewriteManifests = table.rewriteManifests(); |
| manifests.forEach(rewriteManifests::deleteManifest); |
| rewriteManifests.addManifest(newManifest); |
| rewriteManifests.commit(); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| removeSnapshots(table).expireOlderThan(t4).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(deletedFiles).contains(FILE_A.path().toString()); |
| assertThat(deletedFiles).contains(FILE_B.path().toString()); |
| } |
| |
| @TestTemplate |
| public void dataFilesCleanupWithParallelTasks() throws IOException { |
| table.newFastAppend().appendFile(FILE_A).commit(); |
| |
| table.newFastAppend().appendFile(FILE_B).commit(); |
| |
| table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)).commit(); |
| long thirdSnapshotId = table.currentSnapshot().snapshotId(); |
| |
| table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)).commit(); |
| long fourthSnapshotId = table.currentSnapshot().snapshotId(); |
| |
| long t4 = System.currentTimeMillis(); |
| while (t4 <= table.currentSnapshot().timestampMillis()) { |
| t4 = System.currentTimeMillis(); |
| } |
| |
| List<ManifestFile> manifests = table.currentSnapshot().dataManifests(table.io()); |
| |
| ManifestFile newManifest = |
| writeManifest( |
| "manifest-file-1.avro", |
| manifestEntry(Status.EXISTING, thirdSnapshotId, FILE_C), |
| manifestEntry(Status.EXISTING, fourthSnapshotId, FILE_D)); |
| |
| RewriteManifests rewriteManifests = table.rewriteManifests(); |
| manifests.forEach(rewriteManifests::deleteManifest); |
| rewriteManifests.addManifest(newManifest); |
| rewriteManifests.commit(); |
| |
| Set<String> deletedFiles = ConcurrentHashMap.newKeySet(); |
| Set<String> deleteThreads = ConcurrentHashMap.newKeySet(); |
| AtomicInteger deleteThreadsIndex = new AtomicInteger(0); |
| AtomicInteger planThreadsIndex = new AtomicInteger(0); |
| |
| removeSnapshots(table) |
| .executeDeleteWith( |
| Executors.newFixedThreadPool( |
| 4, |
| runnable -> { |
| Thread thread = new Thread(runnable); |
| thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement()); |
| thread.setDaemon( |
| true); // daemon threads will be terminated abruptly when the JVM exits |
| return thread; |
| })) |
| .planWith( |
| Executors.newFixedThreadPool( |
| 1, |
| runnable -> { |
| Thread thread = new Thread(runnable); |
| thread.setName("plan-" + planThreadsIndex.getAndIncrement()); |
| thread.setDaemon( |
| true); // daemon threads will be terminated abruptly when the JVM exits |
| return thread; |
| })) |
| .expireOlderThan(t4) |
| .deleteWith( |
| s -> { |
| deleteThreads.add(Thread.currentThread().getName()); |
| deletedFiles.add(s); |
| }) |
| .commit(); |
| |
| // Verifies that the delete methods ran in the threads created by the provided ExecutorService |
| // ThreadFactory |
| assertThat(deleteThreads) |
| .containsExactly( |
| "remove-snapshot-3", "remove-snapshot-2", "remove-snapshot-1", "remove-snapshot-0"); |
| |
| assertThat(deletedFiles).contains(FILE_A.path().toString()); |
| assertThat(deletedFiles).contains(FILE_B.path().toString()); |
| assertThat(planThreadsIndex.get()) |
| .as("Thread should be created in provided pool") |
| .isGreaterThan(0); |
| } |
| |
| @TestTemplate |
| public void noDataFileCleanup() throws IOException { |
| table.newFastAppend().appendFile(FILE_A).commit(); |
| |
| table.newFastAppend().appendFile(FILE_B).commit(); |
| |
| table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)).commit(); |
| |
| table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)).commit(); |
| |
| long t4 = System.currentTimeMillis(); |
| while (t4 <= table.currentSnapshot().timestampMillis()) { |
| t4 = System.currentTimeMillis(); |
| } |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| removeSnapshots(table) |
| .cleanExpiredFiles(false) |
| .expireOlderThan(t4) |
| .deleteWith(deletedFiles::add) |
| .commit(); |
| |
| assertThat(deletedFiles).isEmpty(); |
| } |
| |
| /** |
| * Test on table below, and expiring the staged commit `B` using `expireOlderThan` API. Table: A - |
| * C ` B (staged) |
| */ |
| @TestTemplate |
| public void testWithExpiringDanglingStageCommit() { |
| // `A` commit |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| // `B` staged commit |
| table.newAppend().appendFile(FILE_B).stageOnly().commit(); |
| |
| TableMetadata base = readMetadata(); |
| Snapshot snapshotA = base.snapshots().get(0); |
| Snapshot snapshotB = base.snapshots().get(1); |
| |
| // `C` commit |
| table.newAppend().appendFile(FILE_C).commit(); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| // Expire all commits including dangling staged snapshot. |
| removeSnapshots(table) |
| .deleteWith(deletedFiles::add) |
| .expireOlderThan(snapshotB.timestampMillis() + 1) |
| .commit(); |
| |
| Set<String> expectedDeletes = Sets.newHashSet(); |
| expectedDeletes.add(snapshotA.manifestListLocation()); |
| |
| // Files should be deleted of dangling staged snapshot |
| snapshotB |
| .addedDataFiles(table.io()) |
| .forEach( |
| i -> { |
| expectedDeletes.add(i.path().toString()); |
| }); |
| |
| // ManifestList should be deleted too |
| expectedDeletes.add(snapshotB.manifestListLocation()); |
| snapshotB |
| .dataManifests(table.io()) |
| .forEach( |
| file -> { |
| // Only the manifest of B should be deleted. |
| if (file.snapshotId() == snapshotB.snapshotId()) { |
| expectedDeletes.add(file.path()); |
| } |
| }); |
| assertThat(deletedFiles).isEqualTo(expectedDeletes); |
| // Take the diff |
| expectedDeletes.removeAll(deletedFiles); |
| assertThat(expectedDeletes).isEmpty(); |
| } |
| |
| /** |
| * Expire cherry-pick the commit as shown below, when `B` is in table's current state Table: A - B |
| * - C <--current snapshot `- D (source=B) |
| */ |
| @TestTemplate |
| public void testWithCherryPickTableSnapshot() { |
| // `A` commit |
| table.newAppend().appendFile(FILE_A).commit(); |
| Snapshot snapshotA = table.currentSnapshot(); |
| |
| // `B` commit |
| Set<String> deletedAFiles = Sets.newHashSet(); |
| table.newOverwrite().addFile(FILE_B).deleteFile(FILE_A).deleteWith(deletedAFiles::add).commit(); |
| assertThat(deletedAFiles).isEmpty(); |
| |
| // pick the snapshot 'B` |
| Snapshot snapshotB = readMetadata().currentSnapshot(); |
| |
| // `C` commit to let cherry-pick take effect, and avoid fast-forward of `B` with cherry-pick |
| table.newAppend().appendFile(FILE_C).commit(); |
| Snapshot snapshotC = readMetadata().currentSnapshot(); |
| |
| // Move the table back to `A` |
| table.manageSnapshots().setCurrentSnapshot(snapshotA.snapshotId()).commit(); |
| |
| // Generate A -> `D (B)` |
| table.manageSnapshots().cherrypick(snapshotB.snapshotId()).commit(); |
| Snapshot snapshotD = readMetadata().currentSnapshot(); |
| |
| // Move the table back to `C` |
| table.manageSnapshots().setCurrentSnapshot(snapshotC.snapshotId()).commit(); |
| List<String> deletedFiles = Lists.newArrayList(); |
| |
| // Expire `C` |
| removeSnapshots(table) |
| .deleteWith(deletedFiles::add) |
| .expireOlderThan(snapshotC.timestampMillis() + 1) |
| .commit(); |
| |
| // Make sure no dataFiles are deleted for the B, C, D snapshot |
| Lists.newArrayList(snapshotB, snapshotC, snapshotD) |
| .forEach( |
| i -> { |
| i.addedDataFiles(table.io()) |
| .forEach( |
| item -> { |
| assertThat(deletedFiles).doesNotContain(item.path().toString()); |
| }); |
| }); |
| } |
| |
| /** |
| * Test on table below, and expiring `B` which is not in current table state. 1) Expire `B` 2) All |
| * commit Table: A - C - D (B) ` B (staged) |
| */ |
| @TestTemplate |
| public void testWithExpiringStagedThenCherrypick() { |
| // `A` commit |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| // `B` commit |
| table.newAppend().appendFile(FILE_B).stageOnly().commit(); |
| |
| // pick the snapshot that's staged but not committed |
| TableMetadata base = readMetadata(); |
| Snapshot snapshotB = base.snapshots().get(1); |
| |
| // `C` commit to let cherry-pick take effect, and avoid fast-forward of `B` with cherry-pick |
| table.newAppend().appendFile(FILE_C).commit(); |
| |
| // `D (B)` cherry-pick commit |
| table.manageSnapshots().cherrypick(snapshotB.snapshotId()).commit(); |
| |
| base = readMetadata(); |
| Snapshot snapshotD = base.snapshots().get(3); |
| |
| List<String> deletedFiles = Lists.newArrayList(); |
| |
| // Expire `B` commit. |
| table |
| .expireSnapshots() |
| .deleteWith(deletedFiles::add) |
| .expireSnapshotId(snapshotB.snapshotId()) |
| .commit(); |
| |
| // Make sure no dataFiles are deleted for the staged snapshot |
| Lists.newArrayList(snapshotB) |
| .forEach( |
| i -> { |
| i.addedDataFiles(table.io()) |
| .forEach( |
| item -> { |
| assertThat(deletedFiles).doesNotContain(item.path().toString()); |
| }); |
| }); |
| |
| // Expire all snapshots including cherry-pick |
| removeSnapshots(table) |
| .deleteWith(deletedFiles::add) |
| .expireOlderThan(table.currentSnapshot().timestampMillis() + 1) |
| .commit(); |
| |
| // Make sure no dataFiles are deleted for the staged and cherry-pick |
| Lists.newArrayList(snapshotB, snapshotD) |
| .forEach( |
| i -> { |
| i.addedDataFiles(table.io()) |
| .forEach( |
| item -> { |
| assertThat(deletedFiles).doesNotContain(item.path().toString()); |
| }); |
| }); |
| } |
| |
| @TestTemplate |
| public void testExpireSnapshotsWhenGarbageCollectionDisabled() { |
| table.updateProperties().set(TableProperties.GC_ENABLED, "false").commit(); |
| |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| assertThatThrownBy(() -> table.expireSnapshots()) |
| .isInstanceOf(ValidationException.class) |
| .hasMessageStartingWith("Cannot expire snapshots: GC is disabled"); |
| } |
| |
| @TestTemplate |
| public void testExpireWithDefaultRetainLast() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| table.newAppend().appendFile(FILE_C).commit(); |
| |
| assertThat(table.snapshots()).hasSize(3); |
| |
| table.updateProperties().set(TableProperties.MIN_SNAPSHOTS_TO_KEEP, "3").commit(); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| Snapshot snapshotBeforeExpiration = table.currentSnapshot(); |
| |
| removeSnapshots(table) |
| .expireOlderThan(System.currentTimeMillis()) |
| .deleteWith(deletedFiles::add) |
| .commit(); |
| |
| assertThat(table.currentSnapshot()).isEqualTo(snapshotBeforeExpiration); |
| assertThat(table.snapshots()).hasSize(3); |
| assertThat(deletedFiles).isEmpty(); |
| } |
| |
| @TestTemplate |
| public void testExpireWithDefaultSnapshotAge() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| Snapshot firstSnapshot = table.currentSnapshot(); |
| |
| waitUntilAfter(firstSnapshot.timestampMillis()); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| Snapshot secondSnapshot = table.currentSnapshot(); |
| |
| waitUntilAfter(secondSnapshot.timestampMillis()); |
| |
| table.newAppend().appendFile(FILE_C).commit(); |
| Snapshot thirdSnapshot = table.currentSnapshot(); |
| |
| waitUntilAfter(thirdSnapshot.timestampMillis()); |
| |
| assertThat(table.snapshots()).hasSize(3); |
| |
| table.updateProperties().set(TableProperties.MAX_SNAPSHOT_AGE_MS, "1").commit(); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| |
| // rely solely on default configs |
| removeSnapshots(table).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(table.currentSnapshot()).isEqualTo(thirdSnapshot); |
| assertThat(table.snapshots()).hasSize(1); |
| assertThat(deletedFiles) |
| .containsExactlyInAnyOrder( |
| firstSnapshot.manifestListLocation(), secondSnapshot.manifestListLocation()); |
| } |
| |
| @TestTemplate |
| public void testExpireWithDeleteFiles() { |
| assumeThat(formatVersion).as("Delete files only supported in V2 spec").isEqualTo(2); |
| |
| // Data Manifest => File_A |
| table.newAppend().appendFile(FILE_A).commit(); |
| Snapshot firstSnapshot = table.currentSnapshot(); |
| |
| // Data Manifest => FILE_A |
| // Delete Manifest => FILE_A_DELETES |
| table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); |
| Snapshot secondSnapshot = table.currentSnapshot(); |
| assertThat(secondSnapshot.dataManifests(table.io())).hasSize(1); |
| assertThat(secondSnapshot.deleteManifests(table.io())).hasSize(1); |
| |
| // FILE_A and FILE_A_DELETES move into "DELETED" state |
| table |
| .newRewrite() |
| .rewriteFiles( |
| ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES), // deleted |
| ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_B_DELETES)) // added |
| .validateFromSnapshot(secondSnapshot.snapshotId()) |
| .commit(); |
| Snapshot thirdSnapshot = table.currentSnapshot(); |
| Set<ManifestFile> manifestOfDeletedFiles = |
| thirdSnapshot.allManifests(table.io()).stream() |
| .filter(ManifestFile::hasDeletedFiles) |
| .collect(Collectors.toSet()); |
| assertThat(manifestOfDeletedFiles).hasSize(2); |
| |
| // Need one more commit before manifests of files of DELETED state get cleared from current |
| // snapshot. |
| table.newAppend().appendFile(FILE_C).commit(); |
| |
| Snapshot fourthSnapshot = table.currentSnapshot(); |
| long fourthSnapshotTs = waitUntilAfter(fourthSnapshot.timestampMillis()); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| removeSnapshots(table).expireOlderThan(fourthSnapshotTs).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(deletedFiles) |
| .as("Should remove old delete files and delete file manifests") |
| .isEqualTo( |
| ImmutableSet.builder() |
| .add(FILE_A.path()) |
| .add(FILE_A_DELETES.path()) |
| .add(firstSnapshot.manifestListLocation()) |
| .add(secondSnapshot.manifestListLocation()) |
| .add(thirdSnapshot.manifestListLocation()) |
| .addAll(manifestPaths(secondSnapshot, table.io())) |
| .addAll( |
| manifestOfDeletedFiles.stream() |
| .map(ManifestFile::path) |
| .collect(Collectors.toList())) |
| .build()); |
| } |
| |
| @TestTemplate |
| public void testTagExpiration() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| long now = System.currentTimeMillis(); |
| long maxAgeMs = 100; |
| long expirationTime = now + maxAgeMs; |
| |
| table |
| .manageSnapshots() |
| .createTag("tag", table.currentSnapshot().snapshotId()) |
| .setMaxRefAgeMs("tag", maxAgeMs) |
| .commit(); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); |
| |
| waitUntilAfter(expirationTime); |
| |
| removeSnapshots(table).cleanExpiredFiles(false).commit(); |
| |
| assertThat(table.ops().current().ref("tag")).isNull(); |
| assertThat(table.ops().current().ref("branch")).isNotNull(); |
| assertThat(table.ops().current().ref(SnapshotRef.MAIN_BRANCH)).isNotNull(); |
| } |
| |
| @TestTemplate |
| public void testBranchExpiration() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| long now = System.currentTimeMillis(); |
| long maxAgeMs = 100; |
| long expirationTime = now + maxAgeMs; |
| |
| table |
| .manageSnapshots() |
| .createBranch("branch", table.currentSnapshot().snapshotId()) |
| .setMaxRefAgeMs("branch", maxAgeMs) |
| .commit(); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); |
| |
| waitUntilAfter(expirationTime); |
| |
| removeSnapshots(table).cleanExpiredFiles(false).commit(); |
| |
| assertThat(table.ops().current().ref("branch")).isNull(); |
| assertThat(table.ops().current().ref("tag")).isNotNull(); |
| assertThat(table.ops().current().ref(SnapshotRef.MAIN_BRANCH)).isNotNull(); |
| } |
| |
| @TestTemplate |
| public void testIncrementalCleanupFailsWhenExpiringSnapshotId() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| table.newDelete().deleteFile(FILE_A).commit(); |
| long snapshotId = table.currentSnapshot().snapshotId(); |
| table.newAppend().appendFile(FILE_B).commit(); |
| waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots(); |
| |
| assertThatThrownBy( |
| () -> |
| removeSnapshots |
| .withIncrementalCleanup(true) |
| .expireSnapshotId(snapshotId) |
| .cleanExpiredFiles(true) |
| .commit()) |
| .isInstanceOf(UnsupportedOperationException.class) |
| .hasMessage("Cannot clean files incrementally when snapshot IDs are specified"); |
| } |
| |
| @TestTemplate |
| public void testMultipleRefsAndCleanExpiredFilesFailsForIncrementalCleanup() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| table.newDelete().deleteFile(FILE_A).commit(); |
| table.manageSnapshots().createTag("TagA", table.currentSnapshot().snapshotId()).commit(); |
| waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots(); |
| |
| assertThatThrownBy( |
| () -> |
| removeSnapshots |
| .withIncrementalCleanup(true) |
| .expireOlderThan(table.currentSnapshot().timestampMillis()) |
| .cleanExpiredFiles(true) |
| .commit()) |
| .isInstanceOf(UnsupportedOperationException.class) |
| .hasMessage("Cannot incrementally clean files for tables with more than 1 ref"); |
| } |
| |
| @TestTemplate |
| public void testExpireWithStatisticsFiles() throws IOException { |
| table.newAppend().appendFile(FILE_A).commit(); |
| String statsFileLocation1 = statsFileLocation(table.location()); |
| StatisticsFile statisticsFile1 = |
| writeStatsFile( |
| table.currentSnapshot().snapshotId(), |
| table.currentSnapshot().sequenceNumber(), |
| statsFileLocation1, |
| table.io()); |
| commitStats(table, statisticsFile1); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| String statsFileLocation2 = statsFileLocation(table.location()); |
| StatisticsFile statisticsFile2 = |
| writeStatsFile( |
| table.currentSnapshot().snapshotId(), |
| table.currentSnapshot().sequenceNumber(), |
| statsFileLocation2, |
| table.io()); |
| commitStats(table, statisticsFile2); |
| assertThat(table.statisticsFiles()).hasSize(2); |
| |
| long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| removeSnapshots(table).expireOlderThan(tAfterCommits).commit(); |
| |
| // only the current snapshot and its stats file should be retained |
| assertThat(table.snapshots()).hasSize(1); |
| assertThat(table.statisticsFiles()) |
| .hasSize(1) |
| .extracting(StatisticsFile::snapshotId) |
| .as("Should contain only the statistics file of snapshot2") |
| .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId())); |
| |
| assertThat(new File(statsFileLocation1)).doesNotExist(); |
| assertThat(new File(statsFileLocation2)).exists(); |
| } |
| |
| @TestTemplate |
| public void testExpireWithStatisticsFilesWithReuse() throws IOException { |
| table.newAppend().appendFile(FILE_A).commit(); |
| String statsFileLocation1 = statsFileLocation(table.location()); |
| StatisticsFile statisticsFile1 = |
| writeStatsFile( |
| table.currentSnapshot().snapshotId(), |
| table.currentSnapshot().sequenceNumber(), |
| statsFileLocation1, |
| table.io()); |
| commitStats(table, statisticsFile1); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| // If an expired snapshot's stats file is reused for some reason by the live snapshots, |
| // that stats file should not get deleted from the file system as the live snapshots still |
| // reference it. |
| StatisticsFile statisticsFile2 = |
| reuseStatsFile(table.currentSnapshot().snapshotId(), statisticsFile1); |
| commitStats(table, statisticsFile2); |
| |
| assertThat(table.statisticsFiles()).hasSize(2); |
| |
| long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| removeSnapshots(table).expireOlderThan(tAfterCommits).commit(); |
| |
| // only the current snapshot and its stats file (reused from previous snapshot) should be |
| // retained |
| assertThat(table.snapshots()).hasSize(1); |
| assertThat(table.statisticsFiles()) |
| .hasSize(1) |
| .extracting(StatisticsFile::snapshotId) |
| .as("Should contain only the statistics file of snapshot2") |
| .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId())); |
| // the reused stats file should exist. |
| assertThat(new File(statsFileLocation1)).exists(); |
| } |
| |
| @TestTemplate |
| public void testExpireWithPartitionStatisticsFiles() throws IOException { |
| table.newAppend().appendFile(FILE_A).commit(); |
| String statsFileLocation1 = statsFileLocation(table.location()); |
| PartitionStatisticsFile statisticsFile1 = |
| writePartitionStatsFile( |
| table.currentSnapshot().snapshotId(), statsFileLocation1, table.io()); |
| commitPartitionStats(table, statisticsFile1); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| String statsFileLocation2 = statsFileLocation(table.location()); |
| PartitionStatisticsFile statisticsFile2 = |
| writePartitionStatsFile( |
| table.currentSnapshot().snapshotId(), statsFileLocation2, table.io()); |
| commitPartitionStats(table, statisticsFile2); |
| assertThat(table.partitionStatisticsFiles()).hasSize(2); |
| |
| long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| removeSnapshots(table).expireOlderThan(tAfterCommits).commit(); |
| |
| // only the current snapshot and its stats file should be retained |
| assertThat(table.snapshots()).hasSize(1); |
| assertThat(table.partitionStatisticsFiles()) |
| .hasSize(1) |
| .extracting(PartitionStatisticsFile::snapshotId) |
| .as("Should contain only the statistics file of snapshot2") |
| .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId())); |
| |
| assertThat(new File(statsFileLocation1)).doesNotExist(); |
| assertThat(new File(statsFileLocation2)).exists(); |
| } |
| |
| @TestTemplate |
| public void testExpireWithPartitionStatisticsFilesWithReuse() throws IOException { |
| table.newAppend().appendFile(FILE_A).commit(); |
| String statsFileLocation1 = statsFileLocation(table.location()); |
| PartitionStatisticsFile statisticsFile1 = |
| writePartitionStatsFile( |
| table.currentSnapshot().snapshotId(), statsFileLocation1, table.io()); |
| commitPartitionStats(table, statisticsFile1); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| // If an expired snapshot's stats file is reused for some reason by the live snapshots, |
| // that stats file should not get deleted from the file system as the live snapshots still |
| // reference it. |
| PartitionStatisticsFile statisticsFile2 = |
| reusePartitionStatsFile(table.currentSnapshot().snapshotId(), statisticsFile1); |
| commitPartitionStats(table, statisticsFile2); |
| |
| assertThat(table.partitionStatisticsFiles()).hasSize(2); |
| |
| long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| removeSnapshots(table).expireOlderThan(tAfterCommits).commit(); |
| |
| // only the current snapshot and its stats file (reused from previous snapshot) should be |
| // retained |
| assertThat(table.snapshots()).hasSize(1); |
| assertThat(table.partitionStatisticsFiles()) |
| .hasSize(1) |
| .extracting(PartitionStatisticsFile::snapshotId) |
| .as("Should contain only the statistics file of snapshot2") |
| .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId())); |
| // the reused stats file should exist. |
| assertThat(new File(statsFileLocation1)).exists(); |
| } |
| |
| @TestTemplate |
| public void testFailRemovingSnapshotWhenStillReferencedByBranch() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| AppendFiles append = table.newAppend().appendFile(FILE_B).stageOnly(); |
| |
| long snapshotId = append.apply().snapshotId(); |
| |
| append.commit(); |
| |
| table.manageSnapshots().createBranch("branch", snapshotId).commit(); |
| |
| assertThatThrownBy(() -> removeSnapshots(table).expireSnapshotId(snapshotId).commit()) |
| .isInstanceOf(IllegalArgumentException.class) |
| .hasMessage("Cannot expire 2. Still referenced by refs: [branch]"); |
| } |
| |
| @TestTemplate |
| public void testFailRemovingSnapshotWhenStillReferencedByTag() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| long snapshotId = table.currentSnapshot().snapshotId(); |
| |
| table.manageSnapshots().createTag("tag", snapshotId).commit(); |
| |
| // commit another snapshot so the first one isn't referenced by main |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| assertThatThrownBy(() -> removeSnapshots(table).expireSnapshotId(snapshotId).commit()) |
| .isInstanceOf(IllegalArgumentException.class) |
| .hasMessage("Cannot expire 1. Still referenced by refs: [tag]"); |
| } |
| |
| @TestTemplate |
| public void testRetainUnreferencedSnapshotsWithinExpirationAge() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| long expireTimestampSnapshotA = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| waitUntilAfter(expireTimestampSnapshotA); |
| |
| table.newAppend().appendFile(FILE_B).stageOnly().commit(); |
| |
| table.newAppend().appendFile(FILE_C).commit(); |
| |
| removeSnapshots(table).expireOlderThan(expireTimestampSnapshotA).commit(); |
| |
| assertThat(table.ops().current().snapshots()).hasSize(2); |
| } |
| |
| @TestTemplate |
| public void testUnreferencedSnapshotParentOfTag() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| long initialSnapshotId = table.currentSnapshot().snapshotId(); |
| |
| // this will be expired because it is still unreferenced with a tag on its child snapshot |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| long expiredSnapshotId = table.currentSnapshot().snapshotId(); |
| |
| long expireTimestampSnapshotB = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| waitUntilAfter(expireTimestampSnapshotB); |
| |
| table.newAppend().appendFile(FILE_C).commit(); |
| |
| // create a tag that references the current history and rewrite main to point to the initial |
| // snapshot |
| table |
| .manageSnapshots() |
| .createTag("tag", table.currentSnapshot().snapshotId()) |
| .replaceBranch("main", initialSnapshotId) |
| .commit(); |
| |
| removeSnapshots(table) |
| .expireOlderThan(expireTimestampSnapshotB) |
| .cleanExpiredFiles(false) |
| .commit(); |
| |
| assertThat(table.snapshot(expiredSnapshotId)) |
| .as("Should remove unreferenced snapshot beneath a tag") |
| .isNull(); |
| assertThat(table.ops().current().snapshots()).hasSize(2); |
| } |
| |
| @TestTemplate |
| public void testSnapshotParentOfBranchNotUnreferenced() { |
| // similar to testUnreferencedSnapshotParentOfTag, but checks that branch history is not |
| // considered unreferenced |
| table.newAppend().appendFile(FILE_A).commit(); |
| |
| long initialSnapshotId = table.currentSnapshot().snapshotId(); |
| |
| // this will be expired because it is still unreferenced with a tag on its child snapshot |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| long snapshotId = table.currentSnapshot().snapshotId(); |
| |
| long expireTimestampSnapshotB = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| waitUntilAfter(expireTimestampSnapshotB); |
| |
| table.newAppend().appendFile(FILE_C).commit(); |
| |
| // create a branch that references the current history and rewrite main to point to the initial |
| // snapshot |
| table |
| .manageSnapshots() |
| .createBranch("branch", table.currentSnapshot().snapshotId()) |
| .setMaxSnapshotAgeMs("branch", Long.MAX_VALUE) |
| .replaceBranch("main", initialSnapshotId) |
| .commit(); |
| |
| removeSnapshots(table) |
| .expireOlderThan(expireTimestampSnapshotB) |
| .cleanExpiredFiles(false) |
| .commit(); |
| |
| assertThat(table.snapshot(snapshotId)) |
| .as("Should not remove snapshot beneath a branch") |
| .isNotNull(); |
| assertThat(table.ops().current().snapshots()).hasSize(3); |
| } |
| |
| @TestTemplate |
| public void testMinSnapshotsToKeepMultipleBranches() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| long initialSnapshotId = table.currentSnapshot().snapshotId(); |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| // stage a snapshot and get its id |
| AppendFiles append = table.newAppend().appendFile(FILE_C).stageOnly(); |
| long branchSnapshotId = append.apply().snapshotId(); |
| append.commit(); |
| |
| assertThat(table.snapshots()).hasSize(3); |
| |
| long maxSnapshotAgeMs = 1; |
| long expirationTime = System.currentTimeMillis() + maxSnapshotAgeMs; |
| |
| // configure main so that the initial snapshot will expire |
| table |
| .manageSnapshots() |
| .setMinSnapshotsToKeep(SnapshotRef.MAIN_BRANCH, 1) |
| .setMaxSnapshotAgeMs(SnapshotRef.MAIN_BRANCH, 1) |
| .commit(); |
| |
| // retain 3 snapshots on branch (including the initial snapshot) |
| table |
| .manageSnapshots() |
| .createBranch("branch", branchSnapshotId) |
| .setMinSnapshotsToKeep("branch", 3) |
| .setMaxSnapshotAgeMs("branch", maxSnapshotAgeMs) |
| .commit(); |
| |
| waitUntilAfter(expirationTime); |
| table.expireSnapshots().cleanExpiredFiles(false).commit(); |
| |
| assertThat(table.snapshots()).hasSize(3); |
| |
| // stop retaining snapshots from the branch |
| table.manageSnapshots().setMinSnapshotsToKeep("branch", 1).commit(); |
| |
| removeSnapshots(table).cleanExpiredFiles(false).commit(); |
| |
| assertThat(table.snapshots()).hasSize(2); |
| assertThat(table.ops().current().snapshot(initialSnapshotId)).isNull(); |
| } |
| |
| @TestTemplate |
| public void testMaxSnapshotAgeMultipleBranches() { |
| table.newAppend().appendFile(FILE_A).commit(); |
| long initialSnapshotId = table.currentSnapshot().snapshotId(); |
| |
| long ageMs = 10; |
| long expirationTime = System.currentTimeMillis() + ageMs; |
| |
| waitUntilAfter(expirationTime); |
| |
| table.newAppend().appendFile(FILE_B).commit(); |
| |
| // configure main so that the initial snapshot will expire |
| table |
| .manageSnapshots() |
| .setMaxSnapshotAgeMs(SnapshotRef.MAIN_BRANCH, ageMs) |
| .setMinSnapshotsToKeep(SnapshotRef.MAIN_BRANCH, 1) |
| .commit(); |
| |
| // stage a snapshot and get its id |
| AppendFiles append = table.newAppend().appendFile(FILE_C).stageOnly(); |
| long branchSnapshotId = append.apply().snapshotId(); |
| append.commit(); |
| |
| assertThat(table.snapshots()).hasSize(3); |
| |
| // retain all snapshots on branch (including the initial snapshot) |
| table |
| .manageSnapshots() |
| .createBranch("branch", branchSnapshotId) |
| .setMinSnapshotsToKeep("branch", 1) |
| .setMaxSnapshotAgeMs("branch", Long.MAX_VALUE) |
| .commit(); |
| |
| removeSnapshots(table).cleanExpiredFiles(false).commit(); |
| |
| assertThat(table.snapshots()).hasSize(3); |
| |
| // allow the initial snapshot to age off from branch |
| table.manageSnapshots().setMaxSnapshotAgeMs("branch", ageMs).commit(); |
| |
| table.expireSnapshots().cleanExpiredFiles(false).commit(); |
| |
| assertThat(table.snapshots()).hasSize(2); |
| assertThat(table.ops().current().snapshot(initialSnapshotId)).isNull(); |
| } |
| |
| @TestTemplate |
| public void testRetainFilesOnRetainedBranches() { |
| // Append a file to main and test branch |
| String testBranch = "test-branch"; |
| table.newAppend().appendFile(FILE_A).commit(); |
| Snapshot appendA = table.currentSnapshot(); |
| table.manageSnapshots().createBranch(testBranch, appendA.snapshotId()).commit(); |
| |
| // Delete A from main |
| table.newDelete().deleteFile(FILE_A).commit(); |
| Snapshot deletionA = table.currentSnapshot(); |
| // Add B to main |
| table.newAppend().appendFile(FILE_B).commit(); |
| long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); |
| |
| Set<String> deletedFiles = Sets.newHashSet(); |
| Set<String> expectedDeletes = Sets.newHashSet(); |
| |
| // Only deletionA's manifest list and manifests should be removed |
| expectedDeletes.add(deletionA.manifestListLocation()); |
| expectedDeletes.addAll(manifestPaths(deletionA, table.io())); |
| table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); |
| |
| assertThat(table.snapshots()).hasSize(2); |
| assertThat(deletedFiles).isEqualTo(expectedDeletes); |
| |
| // Delete A on test branch |
| table.newDelete().deleteFile(FILE_A).toBranch(testBranch).commit(); |
| Snapshot branchDelete = table.snapshot(testBranch); |
| |
| // Append C on test branch |
| table.newAppend().appendFile(FILE_C).toBranch(testBranch).commit(); |
| Snapshot testBranchHead = table.snapshot(testBranch); |
| |
| deletedFiles = Sets.newHashSet(); |
| expectedDeletes = Sets.newHashSet(); |
| |
| waitUntilAfter(testBranchHead.timestampMillis()); |
| table |
| .expireSnapshots() |
| .expireOlderThan(testBranchHead.timestampMillis()) |
| .deleteWith(deletedFiles::add) |
| .commit(); |
| |
| expectedDeletes.add(appendA.manifestListLocation()); |
| expectedDeletes.addAll(manifestPaths(appendA, table.io())); |
| expectedDeletes.add(branchDelete.manifestListLocation()); |
| expectedDeletes.addAll(manifestPaths(branchDelete, table.io())); |
| expectedDeletes.add(FILE_A.path().toString()); |
| |
| assertThat(table.snapshots()).hasSize(2); |
| assertThat(deletedFiles).isEqualTo(expectedDeletes); |
| } |
| |
| private Set<String> manifestPaths(Snapshot snapshot, FileIO io) { |
| return snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet()); |
| } |
| |
| private RemoveSnapshots removeSnapshots(Table table) { |
| RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots(); |
| return (RemoveSnapshots) removeSnapshots.withIncrementalCleanup(incrementalCleanup); |
| } |
| |
| private StatisticsFile writeStatsFile( |
| long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) |
| throws IOException { |
| try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) { |
| puffinWriter.add( |
| new Blob( |
| "some-blob-type", |
| ImmutableList.of(1), |
| snapshotId, |
| snapshotSequenceNumber, |
| ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); |
| puffinWriter.finish(); |
| |
| return new GenericStatisticsFile( |
| snapshotId, |
| statsLocation, |
| puffinWriter.fileSize(), |
| puffinWriter.footerSize(), |
| puffinWriter.writtenBlobsMetadata().stream() |
| .map(GenericBlobMetadata::from) |
| .collect(ImmutableList.toImmutableList())); |
| } |
| } |
| |
| private StatisticsFile reuseStatsFile(long snapshotId, StatisticsFile statisticsFile) { |
| return new GenericStatisticsFile( |
| snapshotId, |
| statisticsFile.path(), |
| statisticsFile.fileSizeInBytes(), |
| statisticsFile.fileFooterSizeInBytes(), |
| statisticsFile.blobMetadata()); |
| } |
| |
| private void commitStats(Table table, StatisticsFile statisticsFile) { |
| table.updateStatistics().setStatistics(statisticsFile.snapshotId(), statisticsFile).commit(); |
| } |
| |
| private String statsFileLocation(String tableLocation) { |
| String statsFileName = "stats-file-" + UUID.randomUUID(); |
| return tableLocation + "/metadata/" + statsFileName; |
| } |
| |
| private static PartitionStatisticsFile writePartitionStatsFile( |
| long snapshotId, String statsLocation, FileIO fileIO) { |
| PositionOutputStream positionOutputStream; |
| try { |
| positionOutputStream = fileIO.newOutputFile(statsLocation).create(); |
| positionOutputStream.close(); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| |
| return ImmutableGenericPartitionStatisticsFile.builder() |
| .snapshotId(snapshotId) |
| .fileSizeInBytes(42L) |
| .path(statsLocation) |
| .build(); |
| } |
| |
| private static PartitionStatisticsFile reusePartitionStatsFile( |
| long snapshotId, PartitionStatisticsFile statisticsFile) { |
| return ImmutableGenericPartitionStatisticsFile.builder() |
| .path(statisticsFile.path()) |
| .fileSizeInBytes(statisticsFile.fileSizeInBytes()) |
| .snapshotId(snapshotId) |
| .build(); |
| } |
| |
| private static void commitPartitionStats(Table table, PartitionStatisticsFile statisticsFile) { |
| table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); |
| } |
| } |