| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iceberg; |
| |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.function.Consumer; |
| import org.apache.iceberg.exceptions.RuntimeIOException; |
| import org.apache.iceberg.io.CloseableIterable; |
| import org.apache.iceberg.io.FileIO; |
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; |
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; |
| import org.apache.iceberg.util.PropertyUtil; |
| import org.apache.iceberg.util.SnapshotUtil; |
| import org.apache.iceberg.util.Tasks; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| class IncrementalFileCleanup extends FileCleanupStrategy { |
| private static final Logger LOG = LoggerFactory.getLogger(IncrementalFileCleanup.class); |
| |
| IncrementalFileCleanup( |
| FileIO fileIO, |
| ExecutorService deleteExecutorService, |
| ExecutorService planExecutorService, |
| Consumer<String> deleteFunc) { |
| super(fileIO, deleteExecutorService, planExecutorService, deleteFunc); |
| } |
| |
| @Override |
| @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) |
| public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration) { |
| if (afterExpiration.refs().size() > 1) { |
| throw new UnsupportedOperationException( |
| "Cannot incrementally clean files for tables with more than 1 ref"); |
| } |
| |
| // clean up the expired snapshots: |
| // 1. Get a list of the snapshots that were removed |
| // 2. Delete any data files that were deleted by those snapshots and are not in the table |
| // 3. Delete any manifests that are no longer used by current snapshots |
| // 4. Delete the manifest lists |
| |
| Set<Long> validIds = Sets.newHashSet(); |
| for (Snapshot snapshot : afterExpiration.snapshots()) { |
| validIds.add(snapshot.snapshotId()); |
| } |
| |
| Set<Long> expiredIds = Sets.newHashSet(); |
| for (Snapshot snapshot : beforeExpiration.snapshots()) { |
| long snapshotId = snapshot.snapshotId(); |
| if (!validIds.contains(snapshotId)) { |
| // the snapshot was expired |
| LOG.info("Expired snapshot: {}", snapshot); |
| expiredIds.add(snapshotId); |
| } |
| } |
| |
| if (expiredIds.isEmpty()) { |
| // if no snapshots were expired, skip cleanup |
| return; |
| } |
| |
| SnapshotRef branchToCleanup = Iterables.getFirst(beforeExpiration.refs().values(), null); |
| if (branchToCleanup == null) { |
| return; |
| } |
| |
| Snapshot latest = beforeExpiration.snapshot(branchToCleanup.snapshotId()); |
| List<Snapshot> snapshots = afterExpiration.snapshots(); |
| |
| // this is the set of ancestors of the current table state. when removing snapshots, this must |
| // only remove files that were deleted in an ancestor of the current table state to avoid |
| // physically deleting files that were logically deleted in a commit that was rolled back. |
| Set<Long> ancestorIds = |
| Sets.newHashSet(SnapshotUtil.ancestorIds(latest, beforeExpiration::snapshot)); |
| |
| Set<Long> pickedAncestorSnapshotIds = Sets.newHashSet(); |
| for (long snapshotId : ancestorIds) { |
| String sourceSnapshotId = |
| beforeExpiration |
| .snapshot(snapshotId) |
| .summary() |
| .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP); |
| if (sourceSnapshotId != null) { |
| // protect any snapshot that was cherry-picked into the current table state |
| pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId)); |
| } |
| } |
| |
| // find manifests to clean up that are still referenced by a valid snapshot, but written by an |
| // expired snapshot |
| Set<String> validManifests = Sets.newHashSet(); |
| Set<ManifestFile> manifestsToScan = Sets.newHashSet(); |
| |
| // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete |
| // as much of the delete work as possible and avoid orphaned data or manifest files. |
| Tasks.foreach(snapshots) |
| .retry(3) |
| .suppressFailureWhenFinished() |
| .onFailure( |
| (snapshot, exc) -> |
| LOG.warn( |
| "Failed on snapshot {} while reading manifest list: {}", |
| snapshot.snapshotId(), |
| snapshot.manifestListLocation(), |
| exc)) |
| .run( |
| snapshot -> { |
| try (CloseableIterable<ManifestFile> manifests = readManifests(snapshot)) { |
| for (ManifestFile manifest : manifests) { |
| validManifests.add(manifest.path()); |
| |
| long snapshotId = manifest.snapshotId(); |
| // whether the manifest was created by a valid snapshot (true) or an expired |
| // snapshot (false) |
| boolean fromValidSnapshots = validIds.contains(snapshotId); |
| // whether the snapshot that created the manifest was an ancestor of the table |
| // state |
| boolean isFromAncestor = ancestorIds.contains(snapshotId); |
| // whether the changes in this snapshot have been picked into the current table |
| // state |
| boolean isPicked = pickedAncestorSnapshotIds.contains(snapshotId); |
| // if the snapshot that wrote this manifest is no longer valid (has expired), |
| // then delete its deleted files. note that this is only for expired snapshots |
| // that are in the |
| // current table state |
| if (!fromValidSnapshots |
| && (isFromAncestor || isPicked) |
| && manifest.hasDeletedFiles()) { |
| manifestsToScan.add(manifest.copy()); |
| } |
| } |
| |
| } catch (IOException e) { |
| throw new RuntimeIOException( |
| e, "Failed to close manifest list: %s", snapshot.manifestListLocation()); |
| } |
| }); |
| |
| // find manifests to clean up that were only referenced by snapshots that have expired |
| Set<String> manifestListsToDelete = Sets.newHashSet(); |
| Set<String> manifestsToDelete = Sets.newHashSet(); |
| Set<ManifestFile> manifestsToRevert = Sets.newHashSet(); |
| Tasks.foreach(beforeExpiration.snapshots()) |
| .retry(3) |
| .suppressFailureWhenFinished() |
| .onFailure( |
| (snapshot, exc) -> |
| LOG.warn( |
| "Failed on snapshot {} while reading manifest list: {}", |
| snapshot.snapshotId(), |
| snapshot.manifestListLocation(), |
| exc)) |
| .run( |
| snapshot -> { |
| long snapshotId = snapshot.snapshotId(); |
| if (!validIds.contains(snapshotId)) { |
| // determine whether the changes in this snapshot are in the current table state |
| if (pickedAncestorSnapshotIds.contains(snapshotId)) { |
| // this snapshot was cherry-picked into the current table state, so skip cleaning |
| // it up. |
| // its changes will expire when the picked snapshot expires. |
| // A -- C -- D (source=B) |
| // `- B <-- this commit |
| return; |
| } |
| |
| long sourceSnapshotId = |
| PropertyUtil.propertyAsLong( |
| snapshot.summary(), SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1); |
| if (ancestorIds.contains(sourceSnapshotId)) { |
| // this commit was cherry-picked from a commit that is in the current table state. |
| // do not clean up its changes because it would revert data file additions that |
| // are in the current |
| // table. |
| // A -- B -- C |
| // `- D (source=B) <-- this commit |
| return; |
| } |
| |
| if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) { |
| // this commit was cherry-picked from a commit that is in the current table state. |
| // do not clean up its changes because it would revert data file additions that |
| // are in the current |
| // table. |
| // A -- C -- E (source=B) |
| // `- B `- D (source=B) <-- this commit |
| return; |
| } |
| |
| // find any manifests that are no longer needed |
| try (CloseableIterable<ManifestFile> manifests = readManifests(snapshot)) { |
| for (ManifestFile manifest : manifests) { |
| if (!validManifests.contains(manifest.path())) { |
| manifestsToDelete.add(manifest.path()); |
| |
| boolean isFromAncestor = ancestorIds.contains(manifest.snapshotId()); |
| boolean isFromExpiringSnapshot = expiredIds.contains(manifest.snapshotId()); |
| |
| if (isFromAncestor && manifest.hasDeletedFiles()) { |
| // Only delete data files that were deleted in by an expired snapshot if |
| // that napshot is an ancestor of the current table state. Otherwise, a |
| // snapshot |
| // that deleted files and was rolled back will delete files that could be in |
| // the current |
| // table state. |
| manifestsToScan.add(manifest.copy()); |
| } |
| |
| if (!isFromAncestor && isFromExpiringSnapshot && manifest.hasAddedFiles()) { |
| // Because the manifest was written by a snapshot that is not an ancestor of |
| // the current table state, the files added in this manifest can be removed. |
| // The |
| // extra check whether the manifest was written by a known snapshot that was |
| // expired in this commit ensures that the full ancestor list between when |
| // the snapshot |
| // was written and this expiration is known and there is no missing history. |
| // If |
| // history were missing, then the snapshot could be an ancestor of the table |
| // state |
| // but the ancestor ID set would not contain it and this would be unsafe. |
| manifestsToRevert.add(manifest.copy()); |
| } |
| } |
| } |
| } catch (IOException e) { |
| throw new RuntimeIOException( |
| e, "Failed to close manifest list: %s", snapshot.manifestListLocation()); |
| } |
| |
| // add the manifest list to the delete set, if present |
| if (snapshot.manifestListLocation() != null) { |
| manifestListsToDelete.add(snapshot.manifestListLocation()); |
| } |
| } |
| }); |
| |
| Set<String> filesToDelete = |
| findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); |
| |
| deleteFiles(filesToDelete, "data"); |
| deleteFiles(manifestsToDelete, "manifest"); |
| deleteFiles(manifestListsToDelete, "manifest list"); |
| |
| if (!beforeExpiration.statisticsFiles().isEmpty()) { |
| Set<String> expiredStatisticsFilesLocations = |
| expiredStatisticsFilesLocations(beforeExpiration, afterExpiration); |
| deleteFiles(expiredStatisticsFilesLocations, "statistics files"); |
| } |
| } |
| |
| private Set<String> findFilesToDelete( |
| Set<ManifestFile> manifestsToScan, |
| Set<ManifestFile> manifestsToRevert, |
| Set<Long> validIds, |
| TableMetadata current) { |
| Set<String> filesToDelete = ConcurrentHashMap.newKeySet(); |
| Tasks.foreach(manifestsToScan) |
| .retry(3) |
| .suppressFailureWhenFinished() |
| .executeWith(planExecutorService) |
| .onFailure( |
| (item, exc) -> |
| LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) |
| .run( |
| manifest -> { |
| // the manifest has deletes, scan it to find files to delete |
| try (ManifestReader<?> reader = |
| ManifestFiles.open(manifest, fileIO, current.specsById())) { |
| for (ManifestEntry<?> entry : reader.entries()) { |
| // if the snapshot ID of the DELETE entry is no longer valid, the data can be |
| // deleted |
| if (entry.status() == ManifestEntry.Status.DELETED |
| && !validIds.contains(entry.snapshotId())) { |
| // use toString to ensure the path will not change (Utf8 is reused) |
| filesToDelete.add(entry.file().path().toString()); |
| } |
| } |
| } catch (IOException e) { |
| throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); |
| } |
| }); |
| |
| Tasks.foreach(manifestsToRevert) |
| .retry(3) |
| .suppressFailureWhenFinished() |
| .executeWith(planExecutorService) |
| .onFailure( |
| (item, exc) -> |
| LOG.warn("Failed to get added files: this may cause orphaned data files", exc)) |
| .run( |
| manifest -> { |
| // the manifest has deletes, scan it to find files to delete |
| try (ManifestReader<?> reader = |
| ManifestFiles.open(manifest, fileIO, current.specsById())) { |
| for (ManifestEntry<?> entry : reader.entries()) { |
| // delete any ADDED file from manifests that were reverted |
| if (entry.status() == ManifestEntry.Status.ADDED) { |
| // use toString to ensure the path will not change (Utf8 is reused) |
| filesToDelete.add(entry.file().path().toString()); |
| } |
| } |
| } catch (IOException e) { |
| throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); |
| } |
| }); |
| |
| return filesToDelete; |
| } |
| } |