| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg; |
| |
| import com.github.benmanes.caffeine.cache.Caffeine; |
| import com.github.benmanes.caffeine.cache.LoadingCache; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Consumer; |
| import org.apache.iceberg.events.Listeners; |
| import org.apache.iceberg.exceptions.CommitFailedException; |
| import org.apache.iceberg.exceptions.RuntimeIOException; |
| import org.apache.iceberg.io.OutputFile; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; |
| import org.apache.iceberg.util.Exceptions; |
| import org.apache.iceberg.util.Tasks; |
| import org.apache.iceberg.util.ThreadPools; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; |
| import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; |
| import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; |
| import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; |
| import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; |
| import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; |
| import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; |
| import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; |
| import static org.apache.iceberg.TableProperties.MANIFEST_LISTS_ENABLED; |
| import static org.apache.iceberg.TableProperties.MANIFEST_LISTS_ENABLED_DEFAULT; |
| |
| abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> { |
| private static final Logger LOG = LoggerFactory.getLogger(SnapshotProducer.class); |
| static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet(); |
| |
| /** |
| * Default callback used to delete files. |
| */ |
| private final Consumer<String> defaultDelete = new Consumer<String>() { |
| @Override |
| public void accept(String file) { |
| ops.io().deleteFile(file); |
| } |
| }; |
| |
| /** |
| * Cache used to enrich ManifestFile instances that are written to a ManifestListWriter. |
| */ |
| private final LoadingCache<ManifestFile, ManifestFile> manifestsWithMetadata; |
| |
| private final TableOperations ops; |
| private final String commitUUID = UUID.randomUUID().toString(); |
| private final AtomicInteger manifestCount = new AtomicInteger(0); |
| private final AtomicInteger attempt = new AtomicInteger(0); |
| private final List<String> manifestLists = Lists.newArrayList(); |
| private volatile Long snapshotId = null; |
| private TableMetadata base; |
| private boolean stageOnly = false; |
| private Consumer<String> deleteFunc = defaultDelete; |
| |
| protected SnapshotProducer(TableOperations ops) { |
| this.ops = ops; |
| this.base = ops.current(); |
| this.manifestsWithMetadata = Caffeine |
| .newBuilder() |
| .build(file -> { |
| if (file.snapshotId() != null) { |
| return file; |
| } |
| return addMetadata(ops, file); |
| }); |
| } |
| |
| protected abstract ThisT self(); |
| |
| @Override |
| public ThisT stageOnly() { |
| this.stageOnly = true; |
| return self(); |
| } |
| |
| @Override |
| public ThisT deleteWith(Consumer<String> deleteCallback) { |
| Preconditions.checkArgument(this.deleteFunc == defaultDelete, "Cannot set delete callback more than once"); |
| this.deleteFunc = deleteCallback; |
| return self(); |
| } |
| |
| /** |
| * Clean up any uncommitted manifests that were created. |
| * <p> |
| * Manifests may not be committed if apply is called more because a commit conflict has occurred. |
| * Implementations may keep around manifests because the same changes will be made by both apply |
| * calls. This method instructs the implementation to clean up those manifests and passes the |
| * paths of the manifests that were actually committed. |
| * |
| * @param committed a set of manifest paths that were actually committed |
| */ |
| protected abstract void cleanUncommitted(Set<ManifestFile> committed); |
| |
| /** |
| * A string that describes the action that produced the new snapshot. |
| * |
| * @return a string operation |
| */ |
| protected abstract String operation(); |
| |
| /** |
| * Apply the update's changes to the base table metadata and return the new manifest list. |
| * |
| * @param metadataToUpdate the base table metadata to apply changes to |
| * @return a manifest list for the new snapshot. |
| */ |
| protected abstract List<ManifestFile> apply(TableMetadata metadataToUpdate); |
| |
| @Override |
| public Snapshot apply() { |
| this.base = refresh(); |
| Long parentSnapshotId = base.currentSnapshot() != null ? |
| base.currentSnapshot().snapshotId() : null; |
| long sequenceNumber = base.nextSequenceNumber(); |
| |
| List<ManifestFile> manifests = apply(base); |
| |
| if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) { |
| OutputFile manifestList = manifestListPath(); |
| |
| try (ManifestListWriter writer = ManifestLists.write( |
| ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, sequenceNumber)) { |
| |
| // keep track of the manifest lists created |
| manifestLists.add(manifestList.location()); |
| |
| ManifestFile[] manifestFiles = new ManifestFile[manifests.size()]; |
| |
| Tasks.range(manifestFiles.length) |
| .stopOnFailure().throwFailureWhenFinished() |
| .executeWith(ThreadPools.getWorkerPool()) |
| .run(index -> |
| manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index))); |
| |
| writer.addAll(Arrays.asList(manifestFiles)); |
| |
| } catch (IOException e) { |
| throw new RuntimeIOException(e, "Failed to write manifest list file"); |
| } |
| |
| return new BaseSnapshot(ops.io(), |
| sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), |
| manifestList.location()); |
| |
| } else { |
| return new BaseSnapshot(ops.io(), |
| snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), |
| manifests); |
| } |
| } |
| |
| protected abstract Map<String, String> summary(); |
| |
| /** |
| * Returns the snapshot summary from the implementation and updates totals. |
| */ |
| private Map<String, String> summary(TableMetadata previous) { |
| Map<String, String> summary = summary(); |
| |
| if (summary == null) { |
| return ImmutableMap.of(); |
| } |
| |
| Map<String, String> previousSummary; |
| if (previous.currentSnapshot() != null) { |
| if (previous.currentSnapshot().summary() != null) { |
| previousSummary = previous.currentSnapshot().summary(); |
| } else { |
| // previous snapshot had no summary, use an empty summary |
| previousSummary = ImmutableMap.of(); |
| } |
| } else { |
| // if there was no previous snapshot, default the summary to start totals at 0 |
| previousSummary = ImmutableMap.of( |
| SnapshotSummary.TOTAL_RECORDS_PROP, "0", |
| SnapshotSummary.TOTAL_DATA_FILES_PROP, "0", |
| SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0", |
| SnapshotSummary.TOTAL_POS_DELETES_PROP, "0", |
| SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0"); |
| } |
| |
| ImmutableMap.Builder<String, String> builder = ImmutableMap.builder(); |
| |
| // copy all summary properties from the implementation |
| builder.putAll(summary); |
| |
| updateTotal( |
| builder, previousSummary, SnapshotSummary.TOTAL_RECORDS_PROP, |
| summary, SnapshotSummary.ADDED_RECORDS_PROP, SnapshotSummary.DELETED_RECORDS_PROP); |
| updateTotal( |
| builder, previousSummary, SnapshotSummary.TOTAL_DATA_FILES_PROP, |
| summary, SnapshotSummary.ADDED_FILES_PROP, SnapshotSummary.DELETED_FILES_PROP); |
| updateTotal( |
| builder, previousSummary, SnapshotSummary.TOTAL_DELETE_FILES_PROP, |
| summary, SnapshotSummary.ADDED_DELETE_FILES_PROP, SnapshotSummary.REMOVED_DELETE_FILES_PROP); |
| updateTotal( |
| builder, previousSummary, SnapshotSummary.TOTAL_POS_DELETES_PROP, |
| summary, SnapshotSummary.ADDED_POS_DELETES_PROP, SnapshotSummary.REMOVED_POS_DELETES_PROP); |
| updateTotal( |
| builder, previousSummary, SnapshotSummary.TOTAL_EQ_DELETES_PROP, |
| summary, SnapshotSummary.ADDED_EQ_DELETES_PROP, SnapshotSummary.REMOVED_EQ_DELETES_PROP); |
| |
| return builder.build(); |
| } |
| |
| protected TableMetadata current() { |
| return base; |
| } |
| |
| protected TableMetadata refresh() { |
| this.base = ops.refresh(); |
| return base; |
| } |
| |
| @Override |
| public void commit() { |
| // this is always set to the latest commit attempt's snapshot id. |
| AtomicLong newSnapshotId = new AtomicLong(-1L); |
| try { |
| Tasks.foreach(ops) |
| .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) |
| .exponentialBackoff( |
| base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), |
| base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), |
| base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), |
| 2.0 /* exponential */) |
| .onlyRetryOn(CommitFailedException.class) |
| .run(taskOps -> { |
| Snapshot newSnapshot = apply(); |
| newSnapshotId.set(newSnapshot.snapshotId()); |
| TableMetadata updated; |
| if (stageOnly) { |
| updated = base.addStagedSnapshot(newSnapshot); |
| } else { |
| updated = base.replaceCurrentSnapshot(newSnapshot); |
| } |
| |
| if (updated == base) { |
| // do not commit if the metadata has not changed. for example, this may happen when setting the current |
| // snapshot to an ID that is already current. note that this check uses identity. |
| return; |
| } |
| |
| // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries |
| // to ensure that if a concurrent operation assigns the UUID, this operation will not fail. |
| taskOps.commit(base, updated.withUUID()); |
| }); |
| |
| } catch (RuntimeException e) { |
| Exceptions.suppressAndThrow(e, this::cleanAll); |
| } |
| |
| LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName()); |
| |
| try { |
| // at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by |
| // id in case another commit was added between this commit and the refresh. |
| Snapshot saved = ops.refresh().snapshot(newSnapshotId.get()); |
| if (saved != null) { |
| cleanUncommitted(Sets.newHashSet(saved.allManifests())); |
| // also clean up unused manifest lists created by multiple attempts |
| for (String manifestList : manifestLists) { |
| if (!saved.manifestListLocation().equals(manifestList)) { |
| deleteFile(manifestList); |
| } |
| } |
| } else { |
| // saved may not be present if the latest metadata couldn't be loaded due to eventual |
| // consistency problems in refresh. in that case, don't clean up. |
| LOG.warn("Failed to load committed snapshot, skipping manifest clean-up"); |
| } |
| |
| } catch (RuntimeException e) { |
| LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", e); |
| } |
| |
| notifyListeners(); |
| } |
| |
| private void notifyListeners() { |
| try { |
| Object event = updateEvent(); |
| if (event != null) { |
| Listeners.notifyAll(event); |
| } |
| } catch (RuntimeException e) { |
| LOG.warn("Failed to notify listeners", e); |
| } |
| } |
| |
| protected void cleanAll() { |
| for (String manifestList : manifestLists) { |
| deleteFile(manifestList); |
| } |
| manifestLists.clear(); |
| cleanUncommitted(EMPTY_SET); |
| } |
| |
| protected void deleteFile(String path) { |
| deleteFunc.accept(path); |
| } |
| |
| protected OutputFile manifestListPath() { |
| return ops.io().newOutputFile(ops.metadataFileLocation(FileFormat.AVRO.addExtension( |
| String.format("snap-%d-%d-%s", snapshotId(), attempt.incrementAndGet(), commitUUID)))); |
| } |
| |
| protected OutputFile newManifestOutput() { |
| return ops.io().newOutputFile( |
| ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()))); |
| } |
| |
| protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec spec) { |
| return ManifestFiles.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); |
| } |
| |
| protected ManifestWriter<DeleteFile> newDeleteManifestWriter(PartitionSpec spec) { |
| return ManifestFiles.writeDeleteManifest(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); |
| } |
| |
| protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) { |
| return ManifestFiles.read(manifest, ops.io(), ops.current().specsById()); |
| } |
| |
| protected ManifestReader<DeleteFile> newDeleteManifestReader(ManifestFile manifest) { |
| return ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById()); |
| } |
| |
| protected long snapshotId() { |
| if (snapshotId == null) { |
| synchronized (this) { |
| if (snapshotId == null) { |
| this.snapshotId = ops.newSnapshotId(); |
| } |
| } |
| } |
| return snapshotId; |
| } |
| |
| private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) { |
| try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { |
| PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId())); |
| int addedFiles = 0; |
| long addedRows = 0L; |
| int existingFiles = 0; |
| long existingRows = 0L; |
| int deletedFiles = 0; |
| long deletedRows = 0L; |
| |
| Long snapshotId = null; |
| long maxSnapshotId = Long.MIN_VALUE; |
| for (ManifestEntry<DataFile> entry : reader.entries()) { |
| if (entry.snapshotId() > maxSnapshotId) { |
| maxSnapshotId = entry.snapshotId(); |
| } |
| |
| switch (entry.status()) { |
| case ADDED: |
| addedFiles += 1; |
| addedRows += entry.file().recordCount(); |
| if (snapshotId == null) { |
| snapshotId = entry.snapshotId(); |
| } |
| break; |
| case EXISTING: |
| existingFiles += 1; |
| existingRows += entry.file().recordCount(); |
| break; |
| case DELETED: |
| deletedFiles += 1; |
| deletedRows += entry.file().recordCount(); |
| if (snapshotId == null) { |
| snapshotId = entry.snapshotId(); |
| } |
| break; |
| } |
| |
| stats.update(entry.file().partition()); |
| } |
| |
| if (snapshotId == null) { |
| // if no files were added or deleted, use the largest snapshot ID in the manifest |
| snapshotId = maxSnapshotId; |
| } |
| |
| return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(), |
| ManifestContent.DATA, manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId, |
| addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); |
| |
| } catch (IOException e) { |
| throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path()); |
| } |
| } |
| |
| private static void updateTotal(ImmutableMap.Builder<String, String> summaryBuilder, |
| Map<String, String> previousSummary, String totalProperty, |
| Map<String, String> currentSummary, |
| String addedProperty, String deletedProperty) { |
| String totalStr = previousSummary.get(totalProperty); |
| if (totalStr != null) { |
| try { |
| long newTotal = Long.parseLong(totalStr); |
| |
| String addedStr = currentSummary.get(addedProperty); |
| if (newTotal >= 0 && addedStr != null) { |
| newTotal += Long.parseLong(addedStr); |
| } |
| |
| String deletedStr = currentSummary.get(deletedProperty); |
| if (newTotal >= 0 && deletedStr != null) { |
| newTotal -= Long.parseLong(deletedStr); |
| } |
| |
| if (newTotal >= 0) { |
| summaryBuilder.put(totalProperty, String.valueOf(newTotal)); |
| } |
| |
| } catch (NumberFormatException e) { |
| // ignore and do not add total |
| } |
| } |
| } |
| } |