| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iceberg.delta; |
| |
| import static org.apache.spark.sql.functions.current_date; |
| import static org.apache.spark.sql.functions.date_add; |
| import static org.apache.spark.sql.functions.expr; |
| |
| import io.delta.standalone.DeltaLog; |
| import io.delta.standalone.Operation; |
| import io.delta.standalone.OptimisticTransaction; |
| import io.delta.standalone.VersionLog; |
| import io.delta.standalone.actions.Action; |
| import io.delta.standalone.actions.AddFile; |
| import io.delta.standalone.actions.RemoveFile; |
| import io.delta.standalone.exceptions.DeltaConcurrentModificationException; |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.sql.Timestamp; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.stream.Collectors; |
| import org.apache.commons.codec.DecoderException; |
| import org.apache.commons.codec.net.URLCodec; |
| import org.apache.iceberg.Snapshot; |
| import org.apache.iceberg.SnapshotRef; |
| import org.apache.iceberg.Table; |
| import org.apache.iceberg.catalog.TableIdentifier; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.spark.Spark3Util; |
| import org.apache.iceberg.spark.SparkCatalog; |
| import org.apache.iceberg.util.LocationUtil; |
| import org.apache.spark.sql.Dataset; |
| import org.apache.spark.sql.Row; |
| import org.apache.spark.sql.SaveMode; |
| import org.apache.spark.sql.connector.catalog.CatalogPlugin; |
| import org.apache.spark.sql.delta.catalog.DeltaCatalog; |
| import org.assertj.core.api.Assertions; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| @RunWith(Parameterized.class) |
| public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase { |
| private static final String SNAPSHOT_SOURCE_PROP = "snapshot_source"; |
| private static final String DELTA_SOURCE_VALUE = "delta"; |
| private static final String ORIGINAL_LOCATION_PROP = "original_location"; |
| private static final String NAMESPACE = "delta_conversion_test"; |
| private static final String defaultSparkCatalog = "spark_catalog"; |
| private static final String icebergCatalogName = "iceberg_hive"; |
| private static Dataset<Row> typeTestDataFrame; |
| private static Dataset<Row> nestedDataFrame; |
| |
| @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}") |
| public static Object[][] parameters() { |
| return new Object[][] { |
| new Object[] { |
| icebergCatalogName, |
| SparkCatalog.class.getName(), |
| ImmutableMap.of( |
| "type", |
| "hive", |
| "default-namespace", |
| "default", |
| "parquet-enabled", |
| "true", |
| "cache-enabled", |
| "false" // Spark will delete tables using v1, leaving the cache out of sync |
| ) |
| } |
| }; |
| } |
| |
| @Rule public TemporaryFolder temp = new TemporaryFolder(); |
| |
| public TestSnapshotDeltaLakeTable( |
| String catalogName, String implementation, Map<String, String> config) { |
| super(catalogName, implementation, config); |
| spark.conf().set("spark.sql.catalog." + defaultSparkCatalog, DeltaCatalog.class.getName()); |
| } |
| |
| @BeforeClass |
| public static void beforeClass() { |
| spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", NAMESPACE)); |
| |
| typeTestDataFrame = |
| spark |
| .range(0, 5, 1, 5) |
| .withColumnRenamed("id", "longCol") |
| .withColumn("intCol", expr("CAST(longCol AS INT)")) |
| .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) |
| .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) |
| .withColumn("dateCol", date_add(current_date(), 1)) |
| .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) |
| .withColumn("stringCol", expr("CAST(timestampCol AS STRING)")) |
| .withColumn("booleanCol", expr("longCol > 5")) |
| .withColumn("binaryCol", expr("CAST(longCol AS BINARY)")) |
| .withColumn("byteCol", expr("CAST(longCol AS BYTE)")) |
| .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) |
| .withColumn("shortCol", expr("CAST(longCol AS SHORT)")) |
| .withColumn("mapCol", expr("MAP(longCol, decimalCol)")) |
| .withColumn("arrayCol", expr("ARRAY(longCol)")) |
| .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)")); |
| nestedDataFrame = |
| spark |
| .range(0, 5, 1, 5) |
| .withColumn("longCol", expr("id")) |
| .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) |
| .withColumn("magic_number", expr("rand(5) * 100")) |
| .withColumn("dateCol", date_add(current_date(), 1)) |
| .withColumn("dateString", expr("CAST(dateCol AS STRING)")) |
| .withColumn("random1", expr("CAST(rand(5) * 100 as LONG)")) |
| .withColumn("random2", expr("CAST(rand(51) * 100 as LONG)")) |
| .withColumn("random3", expr("CAST(rand(511) * 100 as LONG)")) |
| .withColumn("random4", expr("CAST(rand(15) * 100 as LONG)")) |
| .withColumn("random5", expr("CAST(rand(115) * 100 as LONG)")) |
| .withColumn("innerStruct1", expr("STRUCT(random1, random2)")) |
| .withColumn("innerStruct2", expr("STRUCT(random3, random4)")) |
| .withColumn("structCol1", expr("STRUCT(innerStruct1, innerStruct2)")) |
| .withColumn( |
| "innerStruct3", |
| expr("STRUCT(SHA1(CAST(random5 AS BINARY)), SHA1(CAST(random1 AS BINARY)))")) |
| .withColumn( |
| "structCol2", |
| expr( |
| "STRUCT(innerStruct3, STRUCT(SHA1(CAST(random2 AS BINARY)), SHA1(CAST(random3 AS BINARY))))")) |
| .withColumn("arrayCol", expr("ARRAY(random1, random2, random3, random4, random5)")) |
| .withColumn("arrayStructCol", expr("ARRAY(innerStruct1, innerStruct1, innerStruct1)")) |
| .withColumn("mapCol1", expr("MAP(structCol1, structCol2)")) |
| .withColumn("mapCol2", expr("MAP(longCol, dateString)")) |
| .withColumn("mapCol3", expr("MAP(dateCol, arrayCol)")) |
| .withColumn("structCol3", expr("STRUCT(structCol2, mapCol3, arrayCol)")); |
| } |
| |
| @AfterClass |
| public static void afterClass() { |
| spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", NAMESPACE)); |
| } |
| |
| @Test |
| public void testBasicSnapshotPartitioned() throws IOException { |
| String partitionedIdentifier = destName(defaultSparkCatalog, "partitioned_table"); |
| String partitionedLocation = temp.newFolder().toURI().toString(); |
| |
| writeDeltaTable(nestedDataFrame, partitionedIdentifier, partitionedLocation, "id"); |
| spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3"); |
| spark.sql("UPDATE " + partitionedIdentifier + " SET id=3 WHERE id=1"); |
| |
| String newTableIdentifier = destName(icebergCatalogName, "iceberg_partitioned_table"); |
| SnapshotDeltaLakeTable.Result result = |
| DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( |
| spark, newTableIdentifier, partitionedLocation) |
| .execute(); |
| |
| checkSnapshotIntegrity( |
| partitionedLocation, partitionedIdentifier, newTableIdentifier, result, 0); |
| checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0); |
| checkIcebergTableLocation(newTableIdentifier, partitionedLocation); |
| } |
| |
| @Test |
| public void testBasicSnapshotUnpartitioned() throws IOException { |
| String unpartitionedIdentifier = destName(defaultSparkCatalog, "unpartitioned_table"); |
| String unpartitionedLocation = temp.newFolder().toURI().toString(); |
| |
| writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, unpartitionedLocation, null); |
| spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3"); |
| spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1"); |
| |
| String newTableIdentifier = destName(icebergCatalogName, "iceberg_unpartitioned_table"); |
| SnapshotDeltaLakeTable.Result result = |
| DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( |
| spark, newTableIdentifier, unpartitionedLocation) |
| .execute(); |
| |
| checkSnapshotIntegrity( |
| unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result, 0); |
| checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0); |
| checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation); |
| } |
| |
| @Test |
| public void testSnapshotWithNewLocation() throws IOException { |
| String partitionedIdentifier = destName(defaultSparkCatalog, "partitioned_table"); |
| String partitionedLocation = temp.newFolder().toURI().toString(); |
| String newIcebergTableLocation = temp.newFolder().toURI().toString(); |
| |
| writeDeltaTable(nestedDataFrame, partitionedIdentifier, partitionedLocation, "id"); |
| spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3"); |
| spark.sql("UPDATE " + partitionedIdentifier + " SET id=3 WHERE id=1"); |
| |
| String newTableIdentifier = destName(icebergCatalogName, "iceberg_new_table_location_table"); |
| SnapshotDeltaLakeTable.Result result = |
| DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( |
| spark, newTableIdentifier, partitionedLocation) |
| .tableLocation(newIcebergTableLocation) |
| .execute(); |
| |
| checkSnapshotIntegrity( |
| partitionedLocation, partitionedIdentifier, newTableIdentifier, result, 0); |
| checkTagContentAndOrder(partitionedLocation, newTableIdentifier, 0); |
| checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation); |
| } |
| |
| @Test |
| public void testSnapshotWithAdditionalProperties() throws IOException { |
| String unpartitionedIdentifier = destName(defaultSparkCatalog, "unpartitioned_table"); |
| String unpartitionedLocation = temp.newFolder().toURI().toString(); |
| |
| writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, unpartitionedLocation, null); |
| spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3"); |
| spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1"); |
| |
| // add some properties to the original delta table |
| spark.sql( |
| "ALTER TABLE " |
| + unpartitionedIdentifier |
| + " SET TBLPROPERTIES ('foo'='bar', 'test0'='test0')"); |
| |
| String newTableIdentifier = destName(icebergCatalogName, "iceberg_additional_properties_table"); |
| SnapshotDeltaLakeTable.Result result = |
| DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( |
| spark, newTableIdentifier, unpartitionedLocation) |
| .tableProperty("test1", "test1") |
| .tableProperties( |
| ImmutableMap.of( |
| "test2", "test2", "test3", "test3", "test4", |
| "test4")) // add additional iceberg table properties |
| .execute(); |
| |
| checkSnapshotIntegrity( |
| unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result, 0); |
| checkTagContentAndOrder(unpartitionedLocation, newTableIdentifier, 0); |
| checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation); |
| checkIcebergTableProperties( |
| newTableIdentifier, |
| ImmutableMap.of( |
| "foo", "bar", "test0", "test0", "test1", "test1", "test2", "test2", "test3", "test3", |
| "test4", "test4"), |
| unpartitionedLocation); |
| } |
| |
| @Test |
| public void testSnapshotTableWithExternalDataFiles() throws IOException { |
| String unpartitionedIdentifier = destName(defaultSparkCatalog, "unpartitioned_table"); |
| String externalDataFilesIdentifier = destName(defaultSparkCatalog, "external_data_files_table"); |
| String unpartitionedLocation = temp.newFolder().toURI().toString(); |
| String externalDataFilesTableLocation = temp.newFolder().toURI().toString(); |
| |
| writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, unpartitionedLocation, null); |
| spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3"); |
| spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1"); |
| |
| writeDeltaTable( |
| nestedDataFrame, externalDataFilesIdentifier, externalDataFilesTableLocation, null); |
| // Add parquet files to default.external_data_files_table. The newly added parquet files |
| // are not at the same location as the table. |
| addExternalDatafiles(externalDataFilesTableLocation, unpartitionedLocation); |
| |
| String newTableIdentifier = destName(icebergCatalogName, "iceberg_external_data_files_table"); |
| SnapshotDeltaLakeTable.Result result = |
| DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( |
| spark, newTableIdentifier, externalDataFilesTableLocation) |
| .execute(); |
| checkSnapshotIntegrity( |
| externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result, 0); |
| checkTagContentAndOrder(externalDataFilesTableLocation, newTableIdentifier, 0); |
| checkIcebergTableLocation(newTableIdentifier, externalDataFilesTableLocation); |
| checkDataFilePathsIntegrity(newTableIdentifier, externalDataFilesTableLocation); |
| } |
| |
| @Test |
| public void testSnapshotSupportedTypes() throws IOException { |
| String typeTestIdentifier = destName(defaultSparkCatalog, "type_test_table"); |
| String typeTestTableLocation = temp.newFolder().toURI().toString(); |
| |
| writeDeltaTable(typeTestDataFrame, typeTestIdentifier, typeTestTableLocation, "stringCol"); |
| String newTableIdentifier = destName(icebergCatalogName, "iceberg_type_test_table"); |
| SnapshotDeltaLakeTable.Result result = |
| DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( |
| spark, newTableIdentifier, typeTestTableLocation) |
| .execute(); |
| checkSnapshotIntegrity( |
| typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result, 0); |
| checkTagContentAndOrder(typeTestTableLocation, newTableIdentifier, 0); |
| checkIcebergTableLocation(newTableIdentifier, typeTestTableLocation); |
| checkIcebergTableProperties(newTableIdentifier, ImmutableMap.of(), typeTestTableLocation); |
| } |
| |
| @Test |
| public void testSnapshotVacuumTable() throws IOException { |
| String vacuumTestIdentifier = destName(defaultSparkCatalog, "vacuum_test_table"); |
| String vacuumTestTableLocation = temp.newFolder().toURI().toString(); |
| |
| writeDeltaTable(nestedDataFrame, vacuumTestIdentifier, vacuumTestTableLocation, null); |
| Random random = new Random(); |
| for (int i = 0; i < 13; i++) { |
| spark.sql( |
| "UPDATE " |
| + vacuumTestIdentifier |
| + " SET magic_number = " |
| + random.nextDouble() |
| + " WHERE id = 1"); |
| } |
| |
| boolean deleteResult = |
| Files.deleteIfExists( |
| Paths.get( |
| URI.create( |
| vacuumTestTableLocation.concat("/_delta_log/00000000000000000000.json")))); |
| Assertions.assertThat(deleteResult).isTrue(); |
| spark.sql("VACUUM " + vacuumTestIdentifier + " RETAIN 0 HOURS"); |
| |
| String newTableIdentifier = destName(icebergCatalogName, "iceberg_vacuum_table"); |
| SnapshotDeltaLakeTable.Result result = |
| DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( |
| spark, newTableIdentifier, vacuumTestTableLocation) |
| .execute(); |
| checkSnapshotIntegrity( |
| vacuumTestTableLocation, vacuumTestIdentifier, newTableIdentifier, result, 13); |
| checkTagContentAndOrder(vacuumTestTableLocation, newTableIdentifier, 13); |
| checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation); |
| } |
| |
| @Test |
| public void testSnapshotLogCleanTable() throws IOException { |
| String logCleanTestIdentifier = destName(defaultSparkCatalog, "log_clean_test_table"); |
| String logCleanTestTableLocation = temp.newFolder().toURI().toString(); |
| |
| writeDeltaTable(nestedDataFrame, logCleanTestIdentifier, logCleanTestTableLocation, "id"); |
| Random random = new Random(); |
| for (int i = 0; i < 25; i++) { |
| spark.sql( |
| "UPDATE " |
| + logCleanTestIdentifier |
| + " SET magic_number = " |
| + random.nextDouble() |
| + " WHERE id = 1"); |
| } |
| |
| boolean deleteResult = |
| Files.deleteIfExists( |
| Paths.get( |
| URI.create( |
| logCleanTestTableLocation.concat("/_delta_log/00000000000000000000.json")))); |
| Assertions.assertThat(deleteResult).isTrue(); |
| |
| String newTableIdentifier = destName(icebergCatalogName, "iceberg_log_clean_table"); |
| SnapshotDeltaLakeTable.Result result = |
| DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( |
| spark, newTableIdentifier, logCleanTestTableLocation) |
| .execute(); |
| checkSnapshotIntegrity( |
| logCleanTestTableLocation, logCleanTestIdentifier, newTableIdentifier, result, 10); |
| checkTagContentAndOrder(logCleanTestTableLocation, newTableIdentifier, 10); |
| checkIcebergTableLocation(newTableIdentifier, logCleanTestTableLocation); |
| } |
| |
| private void checkSnapshotIntegrity( |
| String deltaTableLocation, |
| String deltaTableIdentifier, |
| String icebergTableIdentifier, |
| SnapshotDeltaLakeTable.Result snapshotReport, |
| long firstConstructableVersion) { |
| DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation); |
| |
| List<Row> deltaTableContents = |
| spark.sql("SELECT * FROM " + deltaTableIdentifier).collectAsList(); |
| List<Row> icebergTableContents = |
| spark.sql("SELECT * FROM " + icebergTableIdentifier).collectAsList(); |
| |
| Assertions.assertThat(deltaTableContents).hasSize(icebergTableContents.size()); |
| Assertions.assertThat(snapshotReport.snapshotDataFilesCount()) |
| .isEqualTo(countDataFilesInDeltaLakeTable(deltaLog, firstConstructableVersion)); |
| Assertions.assertThat(icebergTableContents) |
| .containsExactlyInAnyOrderElementsOf(deltaTableContents); |
| } |
| |
| private void checkTagContentAndOrder( |
| String deltaTableLocation, String icebergTableIdentifier, long firstConstructableVersion) { |
| DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation); |
| long currentVersion = deltaLog.snapshot().getVersion(); |
| Table icebergTable = getIcebergTable(icebergTableIdentifier); |
| Map<String, SnapshotRef> icebergSnapshotRefs = icebergTable.refs(); |
| List<Snapshot> icebergSnapshots = Lists.newArrayList(icebergTable.snapshots()); |
| |
| Assertions.assertThat(icebergSnapshots.size()) |
| .isEqualTo(currentVersion - firstConstructableVersion + 1); |
| |
| for (int i = 0; i < icebergSnapshots.size(); i++) { |
| long deltaVersion = firstConstructableVersion + i; |
| Snapshot currentIcebergSnapshot = icebergSnapshots.get(i); |
| |
| String expectedVersionTag = "delta-version-" + deltaVersion; |
| icebergSnapshotRefs.get(expectedVersionTag); |
| Assertions.assertThat(icebergSnapshotRefs.get(expectedVersionTag)).isNotNull(); |
| Assertions.assertThat(icebergSnapshotRefs.get(expectedVersionTag).isTag()).isTrue(); |
| Assertions.assertThat(icebergSnapshotRefs.get(expectedVersionTag).snapshotId()) |
| .isEqualTo(currentIcebergSnapshot.snapshotId()); |
| |
| Timestamp deltaVersionTimestamp = deltaLog.getCommitInfoAt(deltaVersion).getTimestamp(); |
| Assertions.assertThat(deltaVersionTimestamp).isNotNull(); |
| String expectedTimestampTag = "delta-ts-" + deltaVersionTimestamp.getTime(); |
| |
| Assertions.assertThat(icebergSnapshotRefs.get(expectedTimestampTag)).isNotNull(); |
| Assertions.assertThat(icebergSnapshotRefs.get(expectedTimestampTag).isTag()).isTrue(); |
| Assertions.assertThat(icebergSnapshotRefs.get(expectedTimestampTag).snapshotId()) |
| .isEqualTo(currentIcebergSnapshot.snapshotId()); |
| } |
| } |
| |
| private void checkIcebergTableLocation(String icebergTableIdentifier, String expectedLocation) { |
| Table icebergTable = getIcebergTable(icebergTableIdentifier); |
| Assertions.assertThat(icebergTable.location()) |
| .isEqualTo(LocationUtil.stripTrailingSlash(expectedLocation)); |
| } |
| |
| private void checkIcebergTableProperties( |
| String icebergTableIdentifier, |
| Map<String, String> expectedAdditionalProperties, |
| String deltaTableLocation) { |
| Table icebergTable = getIcebergTable(icebergTableIdentifier); |
| ImmutableMap.Builder<String, String> expectedPropertiesBuilder = ImmutableMap.builder(); |
| // The snapshot action will put some fixed properties to the table |
| expectedPropertiesBuilder.put(SNAPSHOT_SOURCE_PROP, DELTA_SOURCE_VALUE); |
| expectedPropertiesBuilder.putAll(expectedAdditionalProperties); |
| ImmutableMap<String, String> expectedProperties = expectedPropertiesBuilder.build(); |
| |
| Assertions.assertThat(icebergTable.properties().entrySet()) |
| .containsAll(expectedProperties.entrySet()); |
| Assertions.assertThat(icebergTable.properties()) |
| .containsEntry(ORIGINAL_LOCATION_PROP, deltaTableLocation); |
| } |
| |
| private void checkDataFilePathsIntegrity( |
| String icebergTableIdentifier, String deltaTableLocation) { |
| Table icebergTable = getIcebergTable(icebergTableIdentifier); |
| DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation); |
| // checkSnapshotIntegrity already checks the number of data files in the snapshot iceberg table |
| // equals that in the original delta lake table |
| List<String> deltaTableDataFilePaths = |
| deltaLog.update().getAllFiles().stream() |
| .map(f -> getFullFilePath(f.getPath(), deltaLog.getPath().toString())) |
| .collect(Collectors.toList()); |
| icebergTable |
| .currentSnapshot() |
| .addedDataFiles(icebergTable.io()) |
| .forEach( |
| dataFile -> { |
| Assertions.assertThat(URI.create(dataFile.path().toString()).isAbsolute()).isTrue(); |
| Assertions.assertThat(deltaTableDataFilePaths).contains(dataFile.path().toString()); |
| }); |
| } |
| |
| private Table getIcebergTable(String icebergTableIdentifier) { |
| CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog(); |
| Spark3Util.CatalogAndIdentifier catalogAndIdent = |
| Spark3Util.catalogAndIdentifier( |
| "test catalog", spark, icebergTableIdentifier, defaultCatalog); |
| return Spark3Util.loadIcebergCatalog(spark, catalogAndIdent.catalog().name()) |
| .loadTable(TableIdentifier.parse(catalogAndIdent.identifier().toString())); |
| } |
| |
| private String destName(String catalogName, String dest) { |
| if (catalogName.equals(defaultSparkCatalog)) { |
| return NAMESPACE + "." + catalogName + "_" + dest; |
| } |
| return catalogName + "." + NAMESPACE + "." + catalogName + "_" + dest; |
| } |
| |
| /** |
| * Add parquet files manually to a delta lake table to mock the situation that some data files are |
| * not in the same location as the delta lake table. The case that {@link AddFile#getPath()} or |
| * {@link RemoveFile#getPath()} returns absolute path. |
| * |
| * <p>The known <a href="https://github.com/delta-io/connectors/issues/380">issue</a> makes it |
| * necessary to manually rebuild the AddFile to avoid deserialization error when committing the |
| * transaction. |
| */ |
| private void addExternalDatafiles( |
| String targetDeltaTableLocation, String sourceDeltaTableLocation) { |
| DeltaLog targetLog = |
| DeltaLog.forTable(spark.sessionState().newHadoopConf(), targetDeltaTableLocation); |
| OptimisticTransaction transaction = targetLog.startTransaction(); |
| DeltaLog sourceLog = |
| DeltaLog.forTable(spark.sessionState().newHadoopConf(), sourceDeltaTableLocation); |
| List<AddFile> newFiles = |
| sourceLog.update().getAllFiles().stream() |
| .map( |
| f -> |
| AddFile.builder( |
| getFullFilePath(f.getPath(), sourceLog.getPath().toString()), |
| f.getPartitionValues(), |
| f.getSize(), |
| System.currentTimeMillis(), |
| true) |
| .build()) |
| .collect(Collectors.toList()); |
| try { |
| transaction.commit(newFiles, new Operation(Operation.Name.UPDATE), "Delta-Lake/2.2.0"); |
| } catch (DeltaConcurrentModificationException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private static String getFullFilePath(String path, String tableRoot) { |
| URI dataFileUri = URI.create(path); |
| try { |
| String decodedPath = new URLCodec().decode(path); |
| if (dataFileUri.isAbsolute()) { |
| return decodedPath; |
| } else { |
| return tableRoot + File.separator + decodedPath; |
| } |
| } catch (DecoderException e) { |
| throw new IllegalArgumentException(String.format("Cannot decode path %s", path), e); |
| } |
| } |
| |
| private void writeDeltaTable( |
| Dataset<Row> df, String identifier, String path, String partitionColumn) { |
| spark.sql(String.format("DROP TABLE IF EXISTS %s", identifier)); |
| if (partitionColumn != null) { |
| df.write() |
| .format("delta") |
| .mode(SaveMode.Append) |
| .option("path", path) |
| .partitionBy(partitionColumn) |
| .saveAsTable(identifier); |
| } else { |
| df.write().format("delta").mode(SaveMode.Append).option("path", path).saveAsTable(identifier); |
| } |
| } |
| |
| private long countDataFilesInDeltaLakeTable(DeltaLog deltaLog, long firstConstructableVersion) { |
| long dataFilesCount = 0; |
| |
| List<AddFile> initialDataFiles = |
| deltaLog.getSnapshotForVersionAsOf(firstConstructableVersion).getAllFiles(); |
| dataFilesCount += initialDataFiles.size(); |
| |
| Iterator<VersionLog> versionLogIterator = |
| deltaLog.getChanges( |
| firstConstructableVersion + 1, false // not throw exception when data loss detected |
| ); |
| |
| while (versionLogIterator.hasNext()) { |
| VersionLog versionLog = versionLogIterator.next(); |
| List<Action> addFiles = |
| versionLog.getActions().stream() |
| .filter(action -> action instanceof AddFile) |
| .collect(Collectors.toList()); |
| dataFilesCount += addFiles.size(); |
| } |
| |
| return dataFilesCount; |
| } |
| } |