API, Spark 4.0: Add create_file_list option to RewriteTablePathProcedure. (#13837)

diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java
index a636470..4f65d46 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java
@@ -86,6 +86,19 @@
    */
   RewriteTablePath stagingLocation(String stagingLocation);
 
+  /**
+   * Whether to create the file list.
+   *
+   * <p>The default value is true, which means the file list will be created. If set to false, the
+   * file list will not be created.
+   *
+   * @param createFileList true to create the file list, false to skip it
+   * @return this instance for method chaining
+   */
+  default RewriteTablePath createFileList(boolean createFileList) {
+    return this;
+  }
+
   /** The action result that contains a summary of the execution. */
   interface Result {
     /** Staging location of rewritten files */
@@ -112,5 +125,15 @@
 
     /** Name of latest metadata file version */
     String latestVersion();
+
+    /** Number of delete files with rewritten paths. */
+    default int rewrittenDeleteFilePathsCount() {
+      return 0;
+    }
+
+    /** Number of manifest files with rewritten paths. */
+    default int rewrittenManifestFilePathsCount() {
+      return 0;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteTablePath.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteTablePath.java
index 1fb343d..6f8ab30 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteTablePath.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteTablePath.java
@@ -29,5 +29,17 @@
 interface BaseRewriteTablePath extends RewriteTablePath {
 
   @Value.Immutable
-  interface Result extends RewriteTablePath.Result {}
+  interface Result extends RewriteTablePath.Result {
+    @Override
+    @Value.Default
+    default int rewrittenDeleteFilePathsCount() {
+      return RewriteTablePath.Result.super.rewrittenDeleteFilePathsCount();
+    }
+
+    @Override
+    @Value.Default
+    default int rewrittenManifestFilePathsCount() {
+      return RewriteTablePath.Result.super.rewrittenManifestFilePathsCount();
+    }
+  }
 }
diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
index 71da131..0c83f34 100644
--- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
+++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
@@ -22,13 +22,20 @@
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.atIndex;
 
+import java.io.File;
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.RewriteTablePathUtil;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.util.Pair;
 import org.apache.spark.sql.AnalysisException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -171,8 +178,93 @@
             "Cannot find provided version file %s in metadata log.", "v11.metadata.json");
   }
 
+  @TestTemplate
+  public void testRewriteTablePathWithoutFileList() {
+    String location = targetTableDir.toFile().toURI().toString();
+    Table table = validationCatalog.loadTable(tableIdent);
+    String metadataJson = TableUtil.metadataFileLocation(table);
+
+    List<Object[]> result =
+        sql(
+            "CALL %s.system.rewrite_table_path(table => '%s', source_prefix => '%s', target_prefix => '%s', create_file_list => false)",
+            catalogName, tableIdent, table.location(), location);
+    assertThat(result).hasSize(1);
+    assertThat(result.get(0)[0])
+        .as("Should return correct latest version")
+        .isEqualTo(RewriteTablePathUtil.fileName(metadataJson));
+    assertThat(result.get(0)[1])
+        .as("Check if file list location is correctly marked as N/A when not generated")
+        .asString()
+        .isEqualTo("N/A");
+  }
+
   private void checkFileListLocationCount(String fileListLocation, long expectedFileCount) {
     long fileCount = spark.read().format("text").load(fileListLocation).count();
     assertThat(fileCount).isEqualTo(expectedFileCount);
   }
+
+  @TestTemplate
+  public void testRewriteTablePathWithManifestAndDeleteCounts() throws IOException {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    List<Pair<CharSequence, Long>> rowsToDelete =
+        Lists.newArrayList(
+            Pair.of(
+                table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(),
+                0L));
+
+    File file = new File(removePrefix(table.location()) + "/data/deletes.parquet");
+    String filePath = file.toURI().toString();
+    if (SparkCatalogConfig.REST.catalogName().equals(catalogName)) {
+      // We applied this special handling because the base path for
+      // matching the RESTCATALOG's Hive BaseLocation is represented
+      // in the form of an AbsolutePath.
+      filePath = file.getAbsolutePath().toString();
+    }
+
+    DeleteFile positionDeletes =
+        FileHelpers.writeDeleteFile(table, table.io().newOutputFile(filePath), rowsToDelete)
+            .first();
+
+    table.newRowDelta().addDeletes(positionDeletes).commit();
+
+    sql("INSERT INTO %s VALUES (4, 'd')", tableName);
+
+    String targetLocation = targetTableDir.toFile().toURI().toString();
+    String stagingLocation = staging.toFile().toURI().toString();
+
+    List<Object[]> result =
+        sql(
+            "CALL %s.system.rewrite_table_path("
+                + "table => '%s', "
+                + "source_prefix => '%s', "
+                + "target_prefix => '%s', "
+                + "staging_location => '%s', create_file_list => false)",
+            catalogName, tableIdent, table.location(), targetLocation, stagingLocation);
+
+    assertThat(result).hasSize(1);
+    Object[] row = result.get(0);
+
+    int rewrittenManifestFilesCount = ((Number) row[2]).intValue();
+    int rewrittenDeleteFilesCount = ((Number) row[3]).intValue();
+
+    assertThat(rewrittenDeleteFilesCount)
+        .as(
+            "Expected exactly 1 delete file to be rewritten, but found "
+                + rewrittenDeleteFilesCount)
+        .isEqualTo(1);
+
+    assertThat(rewrittenManifestFilesCount)
+        .as(
+            "Expected exactly 5 manifest files to be rewritten, but found "
+                + rewrittenManifestFilesCount)
+        .isEqualTo(5);
+  }
+
+  private String removePrefix(String path) {
+    return path.substring(path.lastIndexOf(":") + 1);
+  }
 }
diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index 9e610c4..fc0dc29 100644
--- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -91,12 +91,14 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
   private static final String RESULT_LOCATION = "file-list";
+  static final String NOT_APPLICABLE = "N/A";
 
   private String sourcePrefix;
   private String targetPrefix;
   private String startVersionName;
   private String endVersionName;
   private String stagingDir;
+  private boolean createFileList = true;
 
   private final Table table;
   private Broadcast<Table> tableBroadcast = null;
@@ -151,6 +153,12 @@
   }
 
   @Override
+  public RewriteTablePath createFileList(boolean createFileListFlag) {
+    this.createFileList = createFileListFlag;
+    return this;
+  }
+
+  @Override
   public Result execute() {
     validateInputs();
     JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc());
@@ -158,12 +166,7 @@
   }
 
   private Result doExecute() {
-    String resultLocation = rebuildMetadata();
-    return ImmutableRewriteTablePath.Result.builder()
-        .stagingLocation(stagingDir)
-        .fileListLocation(resultLocation)
-        .latestVersion(RewriteTablePathUtil.fileName(endVersionName))
-        .build();
+    return rebuildMetadata();
   }
 
   private void validateInputs() {
@@ -264,7 +267,7 @@
    *   <li>Get all files needed to move
    * </ul>
    */
-  private String rebuildMetadata() {
+  private Result rebuildMetadata() {
     TableMetadata startMetadata =
         startVersionName != null
             ? ((HasTableOperations) newStaticTable(startVersionName, table.io()))
@@ -289,6 +292,7 @@
             .reduce(new RewriteResult<>(), RewriteResult::append);
 
     // rebuild manifest files
+    Set<ManifestFile> metaFiles = rewriteManifestListResult.toRewrite();
     RewriteContentFileResult rewriteManifestResult =
         rewriteManifests(deltaSnapshots, endMetadata, rewriteManifestListResult.toRewrite());
 
@@ -300,12 +304,24 @@
             .collect(Collectors.toSet());
     rewritePositionDeletes(deleteFiles);
 
+    ImmutableRewriteTablePath.Result.Builder builder =
+        ImmutableRewriteTablePath.Result.builder()
+            .stagingLocation(stagingDir)
+            .rewrittenDeleteFilePathsCount(deleteFiles.size())
+            .rewrittenManifestFilePathsCount(metaFiles.size())
+            .latestVersion(RewriteTablePathUtil.fileName(endVersionName));
+
+    if (!createFileList) {
+      return builder.fileListLocation(NOT_APPLICABLE).build();
+    }
+
     Set<Pair<String, String>> copyPlan = Sets.newHashSet();
     copyPlan.addAll(rewriteVersionResult.copyPlan());
     copyPlan.addAll(rewriteManifestListResult.copyPlan());
     copyPlan.addAll(rewriteManifestResult.copyPlan());
+    String fileListLocation = saveFileList(copyPlan);
 
-    return saveFileList(copyPlan);
+    return builder.fileListLocation(fileListLocation).build();
   }
 
   private String saveFileList(Set<Pair<String, String>> filesToMove) {
diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
index 1fbe1dd..f40ead0 100644
--- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
+++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
@@ -50,6 +50,8 @@
       optionalInParameter("end_version", DataTypes.StringType);
   private static final ProcedureParameter STAGING_LOCATION_PARAM =
       optionalInParameter("staging_location", DataTypes.StringType);
+  private static final ProcedureParameter CREATE_FILE_LIST_PARAM =
+      optionalInParameter("create_file_list", DataTypes.BooleanType);
 
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
@@ -58,14 +60,22 @@
         TARGET_PREFIX_PARAM,
         START_VERSION_PARAM,
         END_VERSION_PARM,
-        STAGING_LOCATION_PARAM
+        STAGING_LOCATION_PARAM,
+        CREATE_FILE_LIST_PARAM
       };
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
           new StructField[] {
             new StructField("latest_version", DataTypes.StringType, true, Metadata.empty()),
-            new StructField("file_list_location", DataTypes.StringType, true, Metadata.empty())
+            new StructField("file_list_location", DataTypes.StringType, true, Metadata.empty()),
+            new StructField(
+                "rewritten_manifest_file_paths_count",
+                DataTypes.IntegerType,
+                true,
+                Metadata.empty()),
+            new StructField(
+                "rewritten_delete_file_paths_count", DataTypes.IntegerType, true, Metadata.empty())
           });
 
   public static SparkProcedures.ProcedureBuilder builder() {
@@ -100,6 +110,7 @@
     String startVersion = input.asString(START_VERSION_PARAM, null);
     String endVersion = input.asString(END_VERSION_PARM, null);
     String stagingLocation = input.asString(STAGING_LOCATION_PARAM, null);
+    boolean createFileList = input.asBoolean(CREATE_FILE_LIST_PARAM, true);
 
     return withIcebergTable(
         tableIdent,
@@ -116,6 +127,8 @@
             action.stagingLocation(stagingLocation);
           }
 
+          action.createFileList(createFileList);
+
           return asScanIterator(
               OUTPUT_TYPE,
               toOutputRows(action.rewriteLocationPrefix(sourcePrefix, targetPrefix).execute()));
@@ -126,7 +139,9 @@
     return new InternalRow[] {
       newInternalRow(
           UTF8String.fromString(result.latestVersion()),
-          UTF8String.fromString(result.fileListLocation()))
+          UTF8String.fromString(result.fileListLocation()),
+          result.rewrittenManifestFilePathsCount(),
+          result.rewrittenDeleteFilePathsCount())
     };
   }
 
diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index ff68f7e..8399ae5 100644
--- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.actions;
 
+import static org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.NOT_APPLICABLE;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -1234,6 +1235,24 @@
     assertThat(targetPath2).startsWith(targetTableLocation());
   }
 
+  @Test
+  public void testRewritePathWithoutCreateFileList() throws Exception {
+    String targetTableLocation = targetTableLocation();
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(tableLocation, targetTableLocation)
+            .createFileList(false) // Disable file list creation
+            .execute();
+
+    assertThat(result.latestVersion()).isEqualTo("v3.metadata.json");
+
+    assertThat(result.fileListLocation())
+        .as("File list location should not be set when createFileList is false")
+        .isEqualTo(NOT_APPLICABLE);
+  }
+
   protected void checkFileNum(
       int versionFileCount,
       int manifestListCount,