HDDS-14942. Implement manifest selection logic for rewrite based on snapshot delta (#10145)
diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
index 09bea8b..a3e9190 100644
--- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
+++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java
@@ -22,10 +22,21 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.GenericPartitionFieldSummary;
 import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.InternalData;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.RewriteTablePathUtil;
 import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
@@ -37,6 +48,7 @@
 import org.apache.iceberg.TableMetadataParser;
 import org.apache.iceberg.actions.ImmutableRewriteTablePath;
 import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.util.Pair;
 
 /**
@@ -57,11 +69,15 @@ public class RewriteTablePathOzoneAction implements RewriteTablePath {
   private String stagingDir;
   private int parallelism;
 
+  private ExecutorService executorService;
+  private static final int MAX_INFLIGHT_MULTIPLIER = 4;
+  private static final int DEFAULT_THREAD_COUNT = 10;
+
   private final Table table;
 
   public RewriteTablePathOzoneAction(Table table) {
     this.table = table;
-    this.parallelism = Runtime.getRuntime().availableProcessors();
+    this.parallelism = DEFAULT_THREAD_COUNT;
   }
 
   public RewriteTablePathOzoneAction(Table table, int parallelism) {
@@ -102,8 +118,7 @@ public RewriteTablePath stagingLocation(String stagingLocation) {
   @Override
   public Result execute() {
     validateInputs();
-    // TODO: should use for parallel manifest and position delete file rewriting.
-    ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
+    executorService = Executors.newFixedThreadPool(parallelism);
     try {
       return doExecute();
     } finally {
@@ -197,6 +212,9 @@ private boolean versionInFilePath(String path, String version) {
 
   private String rebuildMetadata() {
     //TODO need to implement rewrite of manifest list , manifest files and position delete files.
+    TableMetadata startMetadata = startVersionName != null
+        ? new StaticTableOperations(startVersionName, table.io()).current()
+        : null;
     TableMetadata endMetadata = new StaticTableOperations(endVersionName, table.io()).current();
 
     List<PartitionStatisticsFile> partitionStats = endMetadata.partitionStatisticsFiles();
@@ -205,6 +223,13 @@ private String rebuildMetadata() {
     }
 
     RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata);
+    Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite());
+    Set<Long> deltaSnapshotIds =  deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+    Set<Snapshot> validSnapshots = new HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata));
+    validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata));
+    //TODO: manifestsToRewrite will be used while re-write of manifest-list files.
+    Set<String> manifestsToRewrite = manifestsToRewrite(validSnapshots, 
+        startMetadata != null ? deltaSnapshotIds : null);
 
     Set<Pair<String, String>> copyPlan = new HashSet<>();
     copyPlan.addAll(rewriteVersionResult.copyPlan());
@@ -251,4 +276,101 @@ private Set<Pair<String, String>> rewriteVersionFile(TableMetadata metadata, Str
 
     return result;
   }
+
+  private Set<String> manifestsToRewrite(Set<Snapshot> validSnapshots, Set<Long> deltaSnapshotIds) {
+
+    Set<String> manifestPaths = ConcurrentHashMap.newKeySet();
+    int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+    Semaphore semaphore = new Semaphore(maxInFlight);
+
+    ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
+
+    int submittedTasks = 0;
+    int completedTasks = 0;
+
+    try {
+      for (Snapshot snapshot : validSnapshots) {
+        semaphore.acquire(); // blocks when maxInFlight tasks are already in-flight
+
+        final long snapshotId = snapshot.snapshotId();
+        final String manifestListLocation = snapshot.manifestListLocation();
+
+        boolean taskSubmitted = false;
+        try {
+          completionService.submit(() -> {
+            try (CloseableIterable<ManifestFile> manifests =
+                     InternalData.read(
+                             FileFormat.AVRO,
+                             table.io().newInputFile(manifestListLocation))
+                         .setRootType(GenericManifestFile.class)
+                         .setCustomType(
+                             ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID,
+                             GenericPartitionFieldSummary.class)
+                         .project(ManifestFile.schema())
+                         .build()) {
+
+              for (ManifestFile manifest : manifests) {
+                if (deltaSnapshotIds == null) {
+                  manifestPaths.add(manifest.path());
+                } else if (manifest.snapshotId() != null
+                    && deltaSnapshotIds.contains(manifest.snapshotId())) {
+                  manifestPaths.add(manifest.path());
+                }
+              }
+
+            } catch (Exception e) {
+              throw new RuntimeException(
+                  "Failed to read manifests for snapshot " + snapshotId, e);
+            } finally {
+              semaphore.release();
+            }
+            return null;
+          });
+          taskSubmitted = true;
+          submittedTasks++;
+        } finally {
+          if (!taskSubmitted) {
+            semaphore.release();
+          }
+        }
+        Future<Void> done;
+        while ((done = completionService.poll()) != null) {
+          done.get();
+          completedTasks++;
+        }
+      }
+      
+      while (completedTasks < submittedTasks) {
+        completionService.take().get();
+        completedTasks++;
+      }
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      executorService.shutdownNow();
+      throw new RuntimeException("Interrupted while processing manifests", e);
+
+    } catch (ExecutionException e) {
+      executorService.shutdownNow();
+      throw new RuntimeException(
+          "Failed to collect manifests to rewrite. "
+              + "The end version may contain invalid snapshots. "
+              + "Please choose an earlier version.",
+          e.getCause());
+    }
+
+    return manifestPaths;
+  }
+
+  private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot> allSnapshots) {
+    if (startMetadata == null) {
+      return allSnapshots;
+    } else {
+      Set<Long> startSnapshotIds =
+          startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+      return allSnapshots.stream()
+          .filter(s -> !startSnapshotIds.contains(s.snapshotId()))
+          .collect(Collectors.toSet());
+    }
+  }
 }
diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java
index e7a36da..97f129e 100644
--- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java
+++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java
@@ -27,8 +27,10 @@
 import java.util.Set;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
@@ -113,4 +115,12 @@ static void writeAsCsv(Set<Pair<String, String>> rows, OutputFile outputFile) {
       throw new RuntimeIOException(e);
     }
   }
+
+  static Set<Snapshot> snapshotSet(TableMetadata metadata) {
+    if (metadata == null) {
+      return new HashSet<>();
+    } else {
+      return new HashSet<>(metadata.snapshots());
+    }
+  }
 }