Evolve iceberg table schema for partition copy (#4142)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
index 30dc209..e7c83ee 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -26,6 +26,7 @@
import java.util.concurrent.ExecutionException;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import com.github.rholder.retry.Attempt;
@@ -57,6 +58,14 @@
@Slf4j
public class IcebergOverwritePartitionsStep implements CommitStep {
private final String destTableIdStr;
+ /**
+ * Updated schema must be enforced to be backwards compatible by the catalog in
+ * the partition overwrite step otherwise previous partitions may become unreadable.
+ * This schema is determined at the start of the copy job before data files discovery and the same is
+ * committed before overwriting partitions. It is the responsibility of the catalog to ensure that
+ * no conflicting commits happen in between the copy job.
+ */
+ private final Schema updatedSchema;
private final Properties properties;
// data files are populated once all the copy tasks are done. Each IcebergPartitionCopyableFile has a serialized data file
@Setter private List<DataFile> dataFiles;
@@ -64,6 +73,8 @@
private final String partitionValue;
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".catalog.overwrite.partitions.retries";
+ public static final String SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
+ ".schema.update.retries";
private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
RETRY_TIMES, 3,
@@ -75,8 +86,9 @@
* @param destTableIdStr the identifier of the destination table as a string
* @param properties the properties containing configuration
*/
- public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, Properties properties) {
+ public IcebergOverwritePartitionsStep(String destTableIdStr, Schema updatedSchema, String partitionColName, String partitionValue, Properties properties) {
this.destTableIdStr = destTableIdStr;
+ this.updatedSchema = updatedSchema;
this.partitionColName = partitionColName;
this.partitionValue = partitionValue;
this.properties = properties;
@@ -108,6 +120,13 @@
dataFiles.size(),
dataFiles.get(0).path()
);
+ // update schema
+ Retryer<Void> schemaUpdateRetryer = createSchemaUpdateRetryer();
+ schemaUpdateRetryer.call(() -> {
+ destTable.updateSchema(this.updatedSchema, false);
+ return null;
+ });
+ // overwrite partitions
Retryer<Void> overwritePartitionsRetryer = createOverwritePartitionsRetryer();
overwritePartitionsRetryer.call(() -> {
destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue);
@@ -146,17 +165,30 @@
? config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
: ConfigFactory.empty();
+ return getRetryer(retryerOverridesConfig);
+ }
+
+ private Retryer<Void> createSchemaUpdateRetryer() {
+ Config config = ConfigFactory.parseProperties(this.properties);
+ Config retryerOverridesConfig = config.hasPath(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
+ ? config.getConfig(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
+ : ConfigFactory.empty();
+
+ return getRetryer(retryerOverridesConfig);
+ }
+
+ private Retryer<Void> getRetryer(Config retryerOverridesConfig) {
return RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() {
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- if (attempt.hasException()) {
- String msg = String.format("~%s~ Exception while overwriting partitions [attempt: %d; elapsed: %s]",
- destTableIdStr,
- attempt.getAttemptNumber(),
- Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
- log.warn(msg, attempt.getExceptionCause());
- }
- }
- }));
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+ if (attempt.hasException()) {
+ String msg = String.format("~%s~ Exception occurred [attempt: %d; elapsed: %s]",
+ destTableIdStr,
+ attempt.getAttemptNumber(),
+ Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+ log.warn(msg, attempt.getExceptionCause());
+ }
+ }
+ }));
}
}
\ No newline at end of file
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
index 5600b47..1cc15c1 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -37,6 +37,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
@@ -98,6 +99,7 @@
// Differences are getting data files, copying ancestor permission and adding post publish steps
String fileSet = this.getFileSetId();
IcebergTable srcIcebergTable = getSrcIcebergTable();
+ Schema srcTableSchema = srcIcebergTable.accessTableMetadata().schema();
List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
if (srcDataFiles.isEmpty()) {
@@ -128,7 +130,7 @@
List<CopyEntity> copyEntities = getIcebergParitionCopyEntities(targetFs, srcDataFiles, srcWriteDataLocation, destWriteDataLocation, partitionSpec, copyConfig);
// Adding this check to avoid adding post publish step when there are no files to copy.
if (CollectionUtils.isNotEmpty(copyEntities)) {
- copyEntities.add(createOverwritePostPublishStep());
+ copyEntities.add(createOverwritePostPublishStep(srcTableSchema));
}
log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
@@ -191,9 +193,16 @@
return new Path(fileDir, newFileName);
}
- private PostPublishStep createOverwritePostPublishStep() {
+ /**
+ * Creates a {@link PostPublishStep} for overwriting partitions in the destination Iceberg table.
+ * @param srcTableSchema Schema of the source Iceberg table which needs to be passed to the
+ * overwrite step for updating destination table schema.
+ * @return a {@link PostPublishStep} that performs the partition overwrite operation.
+ */
+ private PostPublishStep createOverwritePostPublishStep(Schema srcTableSchema) {
IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep(
this.getDestIcebergTable().getTableId().toString(),
+ srcTableSchema,
this.partitionColumnName,
this.partitionColValue,
this.properties
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
index 3d3e5c1..2bcc13d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
@@ -53,6 +53,9 @@
boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));
+ // This method only validates if the schema can be updated, no commit is performed here
+ destIcebergTable.updateSchema(srcIcebergTable.accessTableMetadata().schema(), true);
+
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index fab5701..0f6c371 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -36,6 +36,7 @@
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
@@ -48,6 +49,7 @@
import org.apache.iceberg.io.FileIO;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -315,4 +317,31 @@
log.info("~{}~ SnapshotId after overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId());
}
+ /**
+ * update table's schema to the provided {@link Schema}
+ * @param updatedSchema the updated schema to be set on the table.
+ * @param onlyValidate if true, only validates if the schema is can be updated without committing.
+ * @throws TableNotFoundException if the table does not exist.
+ */
+ protected void updateSchema(Schema updatedSchema, boolean onlyValidate) throws TableNotFoundException {
+ TableMetadata currentTableMetadata = accessTableMetadata();
+ Schema currentSchema = currentTableMetadata.schema();
+
+ if (currentSchema.sameSchema(updatedSchema)) {
+ log.info("~{}~ schema is already up-to-date", tableId);
+ return;
+ }
+
+ log.info("~{}~ updating schema from {} to {}, commit: {}", tableId, currentSchema, updatedSchema, !onlyValidate);
+
+ TableMetadata updatedTableMetadata = currentTableMetadata.updateSchema(updatedSchema, updatedSchema.highestFieldId());
+ Preconditions.checkArgument(updatedTableMetadata.schema().sameSchema(updatedSchema), "Schema mismatch after update, please check destination table");
+
+ if (!onlyValidate) {
+ tableOps.commit(currentTableMetadata, updatedTableMetadata);
+ tableOps.refresh();
+ log.info("~{}~ schema updated successfully", tableId);
+ }
+ }
+
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
index 7c47f2d..90dc90c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import lombok.extern.slf4j.Slf4j;
@@ -36,11 +35,7 @@
}
/**
- * Compares the metadata of the given two iceberg tables.
- * <ul>
- * <li>First compares the schema of the metadata.</li>
- * <li>Then compares the partition spec of the metadata.</li>
- * </ul>
+ * Compares the partition spec of the metadata.
* @param tableMetadataA the metadata of the first table
* @param tableMetadataB the metadata of the second table
* @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison
@@ -52,27 +47,6 @@
tableMetadataA.metadataFileLocation(),
tableMetadataB.metadataFileLocation());
- Schema schemaA = tableMetadataA.schema();
- Schema schemaB = tableMetadataB.schema();
- // TODO: Need to add support for schema evolution
- // This check needs to be broken down into multiple checks to support schema evolution
- // Possible cases - schemaA == schemaB,
- // - schemaA is subset of schemaB [ schemaB Evolved ],
- // - schemaA is superset of schemaB [ schemaA Evolved ],
- // - Other cases?
- // Also consider using Strategy or any other design pattern for this to make it a better solution
- if (!schemaA.sameSchema(schemaB)) {
- String errMsg = String.format(
- "Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}",
- tableMetadataA.metadataFileLocation(),
- schemaA.schemaId(),
- tableMetadataB.metadataFileLocation(),
- schemaB.schemaId()
- );
- log.error(errMsg);
- throw new IOException(errMsg);
- }
-
PartitionSpec partitionSpecA = tableMetadataA.spec();
PartitionSpec partitionSpecB = tableMetadataB.spec();
// .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
index e4c1774..634946f 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -24,6 +24,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -40,6 +41,7 @@
private final String testPartitionColName = "testPartition";
private final String testPartitionColValue = "testValue";
private IcebergTable mockIcebergTable;
+ private Schema mockSchema;
private IcebergCatalog mockIcebergCatalog;
private Properties mockProperties;
private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
@@ -47,10 +49,11 @@
@BeforeMethod
public void setUp() throws IOException {
mockIcebergTable = Mockito.mock(IcebergTable.class);
+ mockSchema = Mockito.mock(Schema.class);
mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
mockProperties = new Properties();
- spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
+ spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
testPartitionColName, testPartitionColValue, mockProperties));
spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
@@ -112,7 +115,7 @@
int retryCount = 7;
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES,
Integer.toString(retryCount));
- spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
+ spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
testPartitionColName, testPartitionColValue, mockProperties));
spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
index 2cbde2d..4e97186 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
@@ -35,23 +35,6 @@
.requiredString("field1")
.requiredString("field2")
.endRecord());
- private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2")
- .fields()
- .requiredString("field2")
- .requiredString("field1")
- .endRecord());
- private static final Schema schema3 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3")
- .fields()
- .requiredString("field1")
- .requiredString("field2")
- .requiredInt("field3")
- .endRecord());
- private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4")
- .fields()
- .requiredInt("field1")
- .requiredString("field2")
- .requiredInt("field3")
- .endRecord());
private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1)
.identity("field1")
.build();
@@ -59,9 +42,6 @@
schema1, unpartitionedPartitionSpec, "tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>());
private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 = TableMetadata.newTableMetadata(
schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", new HashMap<>());
- private static final TableMetadata tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata(
- schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>());
- private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata";
private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata";
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true;
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = false;
@@ -75,64 +55,6 @@
}
@Test
- public void testValidateDifferentSchemaFails() {
- // Schema 1 and Schema 2 have different field order
-
- TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat,
- unpartitionedPartitionSpec, "tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>());
-
- verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
- tableMetadataWithSchema2AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
- }
-
- @Test
- public void testValidateSchemaWithDifferentTypesFails() {
- // schema 3 and schema 4 have different field types for field1
-
- TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat,
- unpartitionedPartitionSpec, "tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>());
-
- verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
- tableMetadataWithSchema4AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
- }
-
- @Test
- public void testValidateSchemaWithEvolvedSchemaIFails() {
- // Schema 3 has one more extra field as compared to Schema 1
- verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
- tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
- }
-
- @Test
- public void testValidateSchemaWithEvolvedSchemaIIFails() {
- // TODO: This test should pass in the future when we support schema evolution
- // Schema 3 has one more extra field as compared to Schema 1
- verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
- tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
- }
-
- @Test
- public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() {
- // Adding this test as to verify that partition copy doesn't proceed further for this case
- // as while doing poc and testing had seen final commit gets fail if there is mismatch in field type
- // specially from int to long
- Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5")
- .fields()
- .requiredLong("field1")
- .requiredString("field2")
- .requiredInt("field3")
- .endRecord());
- PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4)
- .identity("field1")
- .build();
- TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4,
- partitionSpec, "tableLocationForSchema5WithPartitionSpec", new HashMap<>());
-
- verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
- tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION);
- }
-
- @Test
public void testValidateSamePartitionSpec() throws IOException {
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1,
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 90ba02b..db3bc32 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -46,6 +46,7 @@
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -60,6 +61,8 @@
import org.apache.iceberg.hive.HiveMetastoreTest;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.apache.iceberg.types.Types;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -105,6 +108,8 @@
private final String tableName = "justtesting";
private final String destTableName = "destTable";
private TableIdentifier tableId;
+ private TableIdentifier sourceTableId;
+ private TableIdentifier destTableId;
private Table table;
private String catalogUri;
private String metadataBasePath;
@@ -126,6 +131,8 @@
@AfterMethod
public void cleanUpEachTest() {
catalog.dropTable(tableId);
+ catalog.dropTable(sourceTableId);
+ catalog.dropTable(destTableId);
}
/** Test to verify getCurrentSnapshotInfo, getAllSnapshotInfosIterator, getIncrementalSnapshotInfosIterator for iceberg table containing only data files.*/
@@ -154,6 +161,164 @@
}
}
+ @Test
+ public void schemaUpdateSuccessTest() throws IcebergTable.TableNotFoundException {
+ // create source iceberg table with this schema
+ Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "product_name", Types.StringType.get()),
+ Types.NestedField.required(3, "details",
+ Types.StructType.of(Types.NestedField.required(4, "description", Types.StringType.get()),
+ Types.NestedField.required(6, "category", Types.StringType.get()),
+ Types.NestedField.required(7, "remarks", Types.StringType.get()))),
+ Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+ PartitionSpec sourceIcebergPartitionSpec = PartitionSpec.builderFor(sourceIcebergSchema)
+ .identity("id")
+ .build();
+
+ sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+ catalog.createTable(sourceTableId, sourceIcebergSchema, sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ // create destination iceberg table with this schema
+ Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()));
+
+ PartitionSpec destIcebergPartitionSpec = PartitionSpec.builderFor(destIcebergSchema)
+ .identity("id")
+ .build();
+
+ destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+ catalog.createTable(destTableId, destIcebergSchema, destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ TableOperations destTableOps = Mockito.spy(catalog.newTableOps(destTableId));
+ TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+ IcebergTable destIcebergTable = new IcebergTable(destTableId, destTableOps, catalogUri, catalog.loadTable(tableId));
+
+ TableMetadata srcTableMetadata = sourceTableOps.current();
+ Schema srcTableSchema = srcTableMetadata.schema();
+
+ // update schema to verify is schema update succeeds
+ destIcebergTable.updateSchema(srcTableSchema, false);
+ Mockito.verify(destTableOps, Mockito.times(1)).commit(Mockito.any(), Mockito.any());
+ }
+
+ @Test
+ public void schemaUpdateTest_divergentSchema() throws IcebergTable.TableNotFoundException {
+ // create source iceberg table with this schema
+ Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "product_name", Types.StringType.get()),
+ Types.NestedField.required(3, "details",
+ Types.StructType.of(Types.NestedField.required(4, "description", Types.StringType.get()),
+ Types.NestedField.required(6, "category", Types.StringType.get()),
+ Types.NestedField.required(7, "remarks", Types.StringType.get()))),
+ Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+ PartitionSpec sourceIcebergPartitionSpec = PartitionSpec.builderFor(sourceIcebergSchema)
+ .identity("id")
+ .build();
+
+ TableIdentifier sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+ catalog.createTable(sourceTableId, sourceIcebergSchema, sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ // create destination iceberg table with this schema
+ Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "randomField", Types.LongType.get()));
+
+ PartitionSpec destIcebergPartitionSpec = PartitionSpec.builderFor(destIcebergSchema)
+ .identity("randomField")
+ .build();
+
+ TableIdentifier destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+ catalog.createTable(destTableId, destIcebergSchema, destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ TableOperations destTableOps = Mockito.spy(catalog.newTableOps(destTableId));
+ TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+ IcebergTable destIcebergTable = new IcebergTable(destTableId, destTableOps, catalogUri, catalog.loadTable(destTableId));
+
+ TableMetadata srcTableMetadata = sourceTableOps.current();
+ Schema srcTableSchema = srcTableMetadata.schema();
+
+ // update schema to verify is schema update succeeds
+ destIcebergTable.updateSchema(srcTableSchema, false);
+ Mockito.verify(destTableOps, Mockito.times(1)).commit(Mockito.any(), Mockito.any());
+ }
+
+ @Test
+ public void schemaUpdateTest_sameSchema_noOpTest() throws IcebergTable.TableNotFoundException {
+ // create source iceberg table with this schema
+ Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, "randomField", Types.LongType.get()));
+
+ PartitionSpec sourceIcebergPartitionSpec = PartitionSpec.builderFor(sourceIcebergSchema)
+ .identity("randomField")
+ .build();
+
+ TableIdentifier sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+ catalog.createTable(sourceTableId, sourceIcebergSchema, sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ // create destination iceberg table with this schema
+ Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "randomField", Types.LongType.get()));
+
+ PartitionSpec destIcebergPartitionSpec = PartitionSpec.builderFor(destIcebergSchema)
+ .identity("randomField")
+ .build();
+
+ TableIdentifier destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+ catalog.createTable(destTableId, destIcebergSchema, destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ TableOperations destTableOps = Mockito.spy(catalog.newTableOps(destTableId));
+ TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+ IcebergTable destIcebergTable = new IcebergTable(destTableId, destTableOps, catalogUri, catalog.loadTable(destTableId));
+
+ TableMetadata srcTableMetadata = sourceTableOps.current();
+ Schema srcTableSchema = srcTableMetadata.schema();
+
+ // update schema to verify is schema update succeeds
+ destIcebergTable.updateSchema(srcTableSchema, false);
+ Mockito.verify(destTableOps, Mockito.times(0)).commit(Mockito.any(), Mockito.any());
+ }
+
+ @Test
+ public void schemaUpdate_onlyValidationTest() throws IcebergTable.TableNotFoundException {
+ // create source iceberg table with this schema
+ Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "product_name", Types.StringType.get()),
+ Types.NestedField.required(3, "details",
+ Types.StructType.of(Types.NestedField.required(4, "description", Types.StringType.get()),
+ Types.NestedField.required(6, "category", Types.StringType.get()),
+ Types.NestedField.required(7, "remarks", Types.StringType.get()))),
+ Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+ PartitionSpec sourceIcebergPartitionSpec = PartitionSpec.builderFor(sourceIcebergSchema)
+ .identity("id")
+ .build();
+
+ sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+ catalog.createTable(sourceTableId, sourceIcebergSchema, sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ // create destination iceberg table with this schema
+ Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()));
+
+ PartitionSpec destIcebergPartitionSpec = PartitionSpec.builderFor(destIcebergSchema)
+ .identity("id")
+ .build();
+
+ destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+ catalog.createTable(destTableId, destIcebergSchema, destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ TableOperations destTableOps = Mockito.spy(catalog.newTableOps(destTableId));
+ TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+ IcebergTable destIcebergTable = new IcebergTable(destTableId, destTableOps, catalogUri, catalog.loadTable(tableId));
+
+ TableMetadata srcTableMetadata = sourceTableOps.current();
+ Schema srcTableSchema = srcTableMetadata.schema();
+
+ // update schema to verify is schema update succeeds
+ destIcebergTable.updateSchema(srcTableSchema, true);
+ Mockito.verify(destTableOps, Mockito.times(0)).commit(Mockito.any(), Mockito.any());
+ }
+
@DataProvider(name = "isPosDeleteProvider")
public Object[][] isPosDeleteProvider() {
return new Object[][] {{true}, {false}};