API, Spark 4.0: Add create_file_list option to RewriteTablePathProcedure. (#13837)
diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java
index a636470..4f65d46 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java
@@ -86,6 +86,19 @@
*/
RewriteTablePath stagingLocation(String stagingLocation);
+ /**
+ * Whether to create the file list.
+ *
+ * <p>The default value is true, which means the file list will be created. If set to false, the
+ * file list will not be created.
+ *
+ * @param createFileList true to create the file list, false to skip it
+ * @return this instance for method chaining
+ */
+ default RewriteTablePath createFileList(boolean createFileList) {
+ return this;
+ }
+
/** The action result that contains a summary of the execution. */
interface Result {
/** Staging location of rewritten files */
@@ -112,5 +125,15 @@
/** Name of latest metadata file version */
String latestVersion();
+
+ /** Number of delete files with rewritten paths. */
+ default int rewrittenDeleteFilePathsCount() {
+ return 0;
+ }
+
+ /** Number of manifest files with rewritten paths. */
+ default int rewrittenManifestFilePathsCount() {
+ return 0;
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteTablePath.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteTablePath.java
index 1fb343d..6f8ab30 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteTablePath.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteTablePath.java
@@ -29,5 +29,17 @@
interface BaseRewriteTablePath extends RewriteTablePath {
@Value.Immutable
- interface Result extends RewriteTablePath.Result {}
+ interface Result extends RewriteTablePath.Result {
+ @Override
+ @Value.Default
+ default int rewrittenDeleteFilePathsCount() {
+ return RewriteTablePath.Result.super.rewrittenDeleteFilePathsCount();
+ }
+
+ @Override
+ @Value.Default
+ default int rewrittenManifestFilePathsCount() {
+ return RewriteTablePath.Result.super.rewrittenManifestFilePathsCount();
+ }
+ }
}
diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
index 71da131..0c83f34 100644
--- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
+++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteTablePathProcedure.java
@@ -22,13 +22,20 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.atIndex;
+import java.io.File;
+import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.AnalysisException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -171,8 +178,93 @@
"Cannot find provided version file %s in metadata log.", "v11.metadata.json");
}
+ @TestTemplate
+ public void testRewriteTablePathWithoutFileList() {
+ String location = targetTableDir.toFile().toURI().toString();
+ Table table = validationCatalog.loadTable(tableIdent);
+ String metadataJson = TableUtil.metadataFileLocation(table);
+
+ List<Object[]> result =
+ sql(
+ "CALL %s.system.rewrite_table_path(table => '%s', source_prefix => '%s', target_prefix => '%s', create_file_list => false)",
+ catalogName, tableIdent, table.location(), location);
+ assertThat(result).hasSize(1);
+ assertThat(result.get(0)[0])
+ .as("Should return correct latest version")
+ .isEqualTo(RewriteTablePathUtil.fileName(metadataJson));
+ assertThat(result.get(0)[1])
+ .as("Check if file list location is correctly marked as N/A when not generated")
+ .asString()
+ .isEqualTo("N/A");
+ }
+
private void checkFileListLocationCount(String fileListLocation, long expectedFileCount) {
long fileCount = spark.read().format("text").load(fileListLocation).count();
assertThat(fileCount).isEqualTo(expectedFileCount);
}
+
+ @TestTemplate
+ public void testRewriteTablePathWithManifestAndDeleteCounts() throws IOException {
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ List<Pair<CharSequence, Long>> rowsToDelete =
+ Lists.newArrayList(
+ Pair.of(
+ table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(),
+ 0L));
+
+ File file = new File(removePrefix(table.location()) + "/data/deletes.parquet");
+ String filePath = file.toURI().toString();
+ if (SparkCatalogConfig.REST.catalogName().equals(catalogName)) {
+ // We applied this special handling because the base path for
+ // matching the RESTCATALOG's Hive BaseLocation is represented
+ // in the form of an AbsolutePath.
+ filePath = file.getAbsolutePath().toString();
+ }
+
+ DeleteFile positionDeletes =
+ FileHelpers.writeDeleteFile(table, table.io().newOutputFile(filePath), rowsToDelete)
+ .first();
+
+ table.newRowDelta().addDeletes(positionDeletes).commit();
+
+ sql("INSERT INTO %s VALUES (4, 'd')", tableName);
+
+ String targetLocation = targetTableDir.toFile().toURI().toString();
+ String stagingLocation = staging.toFile().toURI().toString();
+
+ List<Object[]> result =
+ sql(
+ "CALL %s.system.rewrite_table_path("
+ + "table => '%s', "
+ + "source_prefix => '%s', "
+ + "target_prefix => '%s', "
+ + "staging_location => '%s', create_file_list => false)",
+ catalogName, tableIdent, table.location(), targetLocation, stagingLocation);
+
+ assertThat(result).hasSize(1);
+ Object[] row = result.get(0);
+
+ int rewrittenManifestFilesCount = ((Number) row[2]).intValue();
+ int rewrittenDeleteFilesCount = ((Number) row[3]).intValue();
+
+ assertThat(rewrittenDeleteFilesCount)
+ .as(
+ "Expected exactly 1 delete file to be rewritten, but found "
+ + rewrittenDeleteFilesCount)
+ .isEqualTo(1);
+
+ assertThat(rewrittenManifestFilesCount)
+ .as(
+ "Expected exactly 5 manifest files to be rewritten, but found "
+ + rewrittenManifestFilesCount)
+ .isEqualTo(5);
+ }
+
+ private String removePrefix(String path) {
+ return path.substring(path.lastIndexOf(":") + 1);
+ }
}
diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index 9e610c4..fc0dc29 100644
--- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -91,12 +91,14 @@
private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
private static final String RESULT_LOCATION = "file-list";
+ static final String NOT_APPLICABLE = "N/A";
private String sourcePrefix;
private String targetPrefix;
private String startVersionName;
private String endVersionName;
private String stagingDir;
+ private boolean createFileList = true;
private final Table table;
private Broadcast<Table> tableBroadcast = null;
@@ -151,6 +153,12 @@
}
@Override
+ public RewriteTablePath createFileList(boolean createFileListFlag) {
+ this.createFileList = createFileListFlag;
+ return this;
+ }
+
+ @Override
public Result execute() {
validateInputs();
JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc());
@@ -158,12 +166,7 @@
}
private Result doExecute() {
- String resultLocation = rebuildMetadata();
- return ImmutableRewriteTablePath.Result.builder()
- .stagingLocation(stagingDir)
- .fileListLocation(resultLocation)
- .latestVersion(RewriteTablePathUtil.fileName(endVersionName))
- .build();
+ return rebuildMetadata();
}
private void validateInputs() {
@@ -264,7 +267,7 @@
* <li>Get all files needed to move
* </ul>
*/
- private String rebuildMetadata() {
+ private Result rebuildMetadata() {
TableMetadata startMetadata =
startVersionName != null
? ((HasTableOperations) newStaticTable(startVersionName, table.io()))
@@ -289,6 +292,7 @@
.reduce(new RewriteResult<>(), RewriteResult::append);
// rebuild manifest files
+ Set<ManifestFile> metaFiles = rewriteManifestListResult.toRewrite();
RewriteContentFileResult rewriteManifestResult =
rewriteManifests(deltaSnapshots, endMetadata, rewriteManifestListResult.toRewrite());
@@ -300,12 +304,24 @@
.collect(Collectors.toSet());
rewritePositionDeletes(deleteFiles);
+ ImmutableRewriteTablePath.Result.Builder builder =
+ ImmutableRewriteTablePath.Result.builder()
+ .stagingLocation(stagingDir)
+ .rewrittenDeleteFilePathsCount(deleteFiles.size())
+ .rewrittenManifestFilePathsCount(metaFiles.size())
+ .latestVersion(RewriteTablePathUtil.fileName(endVersionName));
+
+ if (!createFileList) {
+ return builder.fileListLocation(NOT_APPLICABLE).build();
+ }
+
Set<Pair<String, String>> copyPlan = Sets.newHashSet();
copyPlan.addAll(rewriteVersionResult.copyPlan());
copyPlan.addAll(rewriteManifestListResult.copyPlan());
copyPlan.addAll(rewriteManifestResult.copyPlan());
+ String fileListLocation = saveFileList(copyPlan);
- return saveFileList(copyPlan);
+ return builder.fileListLocation(fileListLocation).build();
}
private String saveFileList(Set<Pair<String, String>> filesToMove) {
diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
index 1fbe1dd..f40ead0 100644
--- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
+++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java
@@ -50,6 +50,8 @@
optionalInParameter("end_version", DataTypes.StringType);
private static final ProcedureParameter STAGING_LOCATION_PARAM =
optionalInParameter("staging_location", DataTypes.StringType);
+ private static final ProcedureParameter CREATE_FILE_LIST_PARAM =
+ optionalInParameter("create_file_list", DataTypes.BooleanType);
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
@@ -58,14 +60,22 @@
TARGET_PREFIX_PARAM,
START_VERSION_PARAM,
END_VERSION_PARM,
- STAGING_LOCATION_PARAM
+ STAGING_LOCATION_PARAM,
+ CREATE_FILE_LIST_PARAM
};
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("latest_version", DataTypes.StringType, true, Metadata.empty()),
- new StructField("file_list_location", DataTypes.StringType, true, Metadata.empty())
+ new StructField("file_list_location", DataTypes.StringType, true, Metadata.empty()),
+ new StructField(
+ "rewritten_manifest_file_paths_count",
+ DataTypes.IntegerType,
+ true,
+ Metadata.empty()),
+ new StructField(
+ "rewritten_delete_file_paths_count", DataTypes.IntegerType, true, Metadata.empty())
});
public static SparkProcedures.ProcedureBuilder builder() {
@@ -100,6 +110,7 @@
String startVersion = input.asString(START_VERSION_PARAM, null);
String endVersion = input.asString(END_VERSION_PARM, null);
String stagingLocation = input.asString(STAGING_LOCATION_PARAM, null);
+ boolean createFileList = input.asBoolean(CREATE_FILE_LIST_PARAM, true);
return withIcebergTable(
tableIdent,
@@ -116,6 +127,8 @@
action.stagingLocation(stagingLocation);
}
+ action.createFileList(createFileList);
+
return asScanIterator(
OUTPUT_TYPE,
toOutputRows(action.rewriteLocationPrefix(sourcePrefix, targetPrefix).execute()));
@@ -126,7 +139,9 @@
return new InternalRow[] {
newInternalRow(
UTF8String.fromString(result.latestVersion()),
- UTF8String.fromString(result.fileListLocation()))
+ UTF8String.fromString(result.fileListLocation()),
+ result.rewrittenManifestFilePathsCount(),
+ result.rewrittenDeleteFilePathsCount())
};
}
diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index ff68f7e..8399ae5 100644
--- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.actions;
+import static org.apache.iceberg.spark.actions.RewriteTablePathSparkAction.NOT_APPLICABLE;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -1234,6 +1235,24 @@
assertThat(targetPath2).startsWith(targetTableLocation());
}
+ @Test
+ public void testRewritePathWithoutCreateFileList() throws Exception {
+ String targetTableLocation = targetTableLocation();
+
+ RewriteTablePath.Result result =
+ actions()
+ .rewriteTablePath(table)
+ .rewriteLocationPrefix(tableLocation, targetTableLocation)
+ .createFileList(false) // Disable file list creation
+ .execute();
+
+ assertThat(result.latestVersion()).isEqualTo("v3.metadata.json");
+
+ assertThat(result.fileListLocation())
+ .as("File list location should not be set when createFileList is false")
+ .isEqualTo(NOT_APPLICABLE);
+ }
+
protected void checkFileNum(
int versionFileCount,
int manifestListCount,