HDDS-14942. Implement manifest selection logic for rewrite based on snapshot delta (#10145)
diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
index 09bea8b..a3e9190 100644
--- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
+++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
@@ -22,10 +22,21 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.GenericPartitionFieldSummary;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.InternalData;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
@@ -37,6 +48,7 @@
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.actions.ImmutableRewriteTablePath;
import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.Pair;
/**
@@ -57,11 +69,15 @@ public class RewriteTablePathOzoneAction implements RewriteTablePath {
private String stagingDir;
private int parallelism;
+ private ExecutorService executorService;
+ private static final int MAX_INFLIGHT_MULTIPLIER = 4;
+ private static final int DEFAULT_THREAD_COUNT = 10;
+
private final Table table;
public RewriteTablePathOzoneAction(Table table) {
this.table = table;
- this.parallelism = Runtime.getRuntime().availableProcessors();
+ this.parallelism = DEFAULT_THREAD_COUNT;
}
public RewriteTablePathOzoneAction(Table table, int parallelism) {
@@ -102,8 +118,7 @@ public RewriteTablePath stagingLocation(String stagingLocation) {
@Override
public Result execute() {
validateInputs();
- // TODO: should use for parallel manifest and position delete file rewriting.
- ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
+ executorService = Executors.newFixedThreadPool(parallelism);
try {
return doExecute();
} finally {
@@ -197,6 +212,9 @@ private boolean versionInFilePath(String path, String version) {
private String rebuildMetadata() {
//TODO need to implement rewrite of manifest list , manifest files and position delete files.
+ TableMetadata startMetadata = startVersionName != null
+ ? new StaticTableOperations(startVersionName, table.io()).current()
+ : null;
TableMetadata endMetadata = new StaticTableOperations(endVersionName, table.io()).current();
List<PartitionStatisticsFile> partitionStats = endMetadata.partitionStatisticsFiles();
@@ -205,6 +223,13 @@ private String rebuildMetadata() {
}
RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata);
+ Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite());
+ Set<Long> deltaSnapshotIds = deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ Set<Snapshot> validSnapshots = new HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata));
+ validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata));
+ //TODO: manifestsToRewrite will be used while re-write of manifest-list files.
+ Set<String> manifestsToRewrite = manifestsToRewrite(validSnapshots,
+ startMetadata != null ? deltaSnapshotIds : null);
Set<Pair<String, String>> copyPlan = new HashSet<>();
copyPlan.addAll(rewriteVersionResult.copyPlan());
@@ -251,4 +276,101 @@ private Set<Pair<String, String>> rewriteVersionFile(TableMetadata metadata, Str
return result;
}
+
+ private Set<String> manifestsToRewrite(Set<Snapshot> validSnapshots, Set<Long> deltaSnapshotIds) {
+
+ Set<String> manifestPaths = ConcurrentHashMap.newKeySet();
+ int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+ Semaphore semaphore = new Semaphore(maxInFlight);
+
+ ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
+
+ int submittedTasks = 0;
+ int completedTasks = 0;
+
+ try {
+ for (Snapshot snapshot : validSnapshots) {
+ semaphore.acquire(); // blocks when maxInFlight tasks are already in-flight
+
+ final long snapshotId = snapshot.snapshotId();
+ final String manifestListLocation = snapshot.manifestListLocation();
+
+ boolean taskSubmitted = false;
+ try {
+ completionService.submit(() -> {
+ try (CloseableIterable<ManifestFile> manifests =
+ InternalData.read(
+ FileFormat.AVRO,
+ table.io().newInputFile(manifestListLocation))
+ .setRootType(GenericManifestFile.class)
+ .setCustomType(
+ ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID,
+ GenericPartitionFieldSummary.class)
+ .project(ManifestFile.schema())
+ .build()) {
+
+ for (ManifestFile manifest : manifests) {
+ if (deltaSnapshotIds == null) {
+ manifestPaths.add(manifest.path());
+ } else if (manifest.snapshotId() != null
+ && deltaSnapshotIds.contains(manifest.snapshotId())) {
+ manifestPaths.add(manifest.path());
+ }
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to read manifests for snapshot " + snapshotId, e);
+ } finally {
+ semaphore.release();
+ }
+ return null;
+ });
+ taskSubmitted = true;
+ submittedTasks++;
+ } finally {
+ if (!taskSubmitted) {
+ semaphore.release();
+ }
+ }
+ Future<Void> done;
+ while ((done = completionService.poll()) != null) {
+ done.get();
+ completedTasks++;
+ }
+ }
+
+ while (completedTasks < submittedTasks) {
+ completionService.take().get();
+ completedTasks++;
+ }
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ executorService.shutdownNow();
+ throw new RuntimeException("Interrupted while processing manifests", e);
+
+ } catch (ExecutionException e) {
+ executorService.shutdownNow();
+ throw new RuntimeException(
+ "Failed to collect manifests to rewrite. "
+ + "The end version may contain invalid snapshots. "
+ + "Please choose an earlier version.",
+ e.getCause());
+ }
+
+ return manifestPaths;
+ }
+
+ private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot> allSnapshots) {
+ if (startMetadata == null) {
+ return allSnapshots;
+ } else {
+ Set<Long> startSnapshotIds =
+ startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ return allSnapshots.stream()
+ .filter(s -> !startSnapshotIds.contains(s.snapshotId()))
+ .collect(Collectors.toSet());
+ }
+ }
}
diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java
index e7a36da..97f129e 100644
--- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java
+++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java
@@ -27,8 +27,10 @@
import java.util.Set;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
@@ -113,4 +115,12 @@ static void writeAsCsv(Set<Pair<String, String>> rows, OutputFile outputFile) {
throw new RuntimeIOException(e);
}
}
+
+ static Set<Snapshot> snapshotSet(TableMetadata metadata) {
+ if (metadata == null) {
+ return new HashSet<>();
+ } else {
+ return new HashSet<>(metadata.snapshots());
+ }
+ }
}