Evolve iceberg table schema for partition copy (#4142)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
index 30dc209..e7c83ee 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -26,6 +26,7 @@
 import java.util.concurrent.ExecutionException;
 
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.TableIdentifier;
 
 import com.github.rholder.retry.Attempt;
@@ -57,6 +58,14 @@
 @Slf4j
 public class IcebergOverwritePartitionsStep implements CommitStep {
   private final String destTableIdStr;
+  /**
+   * Updated schema must be enforced to be backwards compatible by the catalog in
+   * the partition overwrite step otherwise previous partitions may become unreadable.
+   * This schema is determined at the start of the copy job before data files discovery and the same is
+   * committed before overwriting partitions. It is the responsibility of the catalog to ensure that
+   * no conflicting commits happen in between the copy job.
+   */
+  private final Schema updatedSchema;
   private final Properties properties;
   // data files are populated once all the copy tasks are done. Each IcebergPartitionCopyableFile has a serialized data file
   @Setter private List<DataFile> dataFiles;
@@ -64,6 +73,8 @@
   private final String partitionValue;
   public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
       ".catalog.overwrite.partitions.retries";
+  public static final String SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
+      ".schema.update.retries";
   private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
       RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
       RETRY_TIMES, 3,
@@ -75,8 +86,9 @@
    * @param destTableIdStr the identifier of the destination table as a string
    * @param properties the properties containing configuration
    */
-  public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, Properties properties) {
+  public IcebergOverwritePartitionsStep(String destTableIdStr, Schema updatedSchema, String partitionColName, String partitionValue, Properties properties) {
     this.destTableIdStr = destTableIdStr;
+    this.updatedSchema = updatedSchema;
     this.partitionColName = partitionColName;
     this.partitionValue = partitionValue;
     this.properties = properties;
@@ -108,6 +120,13 @@
           dataFiles.size(),
           dataFiles.get(0).path()
       );
+      // update schema
+      Retryer<Void> schemaUpdateRetryer = createSchemaUpdateRetryer();
+      schemaUpdateRetryer.call(() -> {
+        destTable.updateSchema(this.updatedSchema, false);
+        return null;
+      });
+      // overwrite partitions
       Retryer<Void> overwritePartitionsRetryer = createOverwritePartitionsRetryer();
       overwritePartitionsRetryer.call(() -> {
         destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue);
@@ -146,17 +165,30 @@
         ? config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
         : ConfigFactory.empty();
 
+    return getRetryer(retryerOverridesConfig);
+  }
+
+  private Retryer<Void> createSchemaUpdateRetryer() {
+    Config config = ConfigFactory.parseProperties(this.properties);
+    Config retryerOverridesConfig = config.hasPath(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
+        ? config.getConfig(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
+        : ConfigFactory.empty();
+
+    return getRetryer(retryerOverridesConfig);
+  }
+
+  private Retryer<Void> getRetryer(Config retryerOverridesConfig) {
     return RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() {
-      @Override
-      public <V> void onRetry(Attempt<V> attempt) {
-        if (attempt.hasException()) {
-          String msg = String.format("~%s~ Exception while overwriting partitions [attempt: %d; elapsed: %s]",
-              destTableIdStr,
-              attempt.getAttemptNumber(),
-              Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
-          log.warn(msg, attempt.getExceptionCause());
-        }
-      }
-    }));
+          @Override
+          public <V> void onRetry(Attempt<V> attempt) {
+            if (attempt.hasException()) {
+              String msg = String.format("~%s~ Exception occurred [attempt: %d; elapsed: %s]",
+                  destTableIdStr,
+                  attempt.getAttemptNumber(),
+                  Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+              log.warn(msg, attempt.getExceptionCause());
+            }
+          }
+        }));
   }
 }
\ No newline at end of file
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
index 5600b47..1cc15c1 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -37,6 +37,7 @@
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableProperties;
@@ -98,6 +99,7 @@
     //  Differences are getting data files, copying ancestor permission and adding post publish steps
     String fileSet = this.getFileSetId();
     IcebergTable srcIcebergTable = getSrcIcebergTable();
+    Schema srcTableSchema = srcIcebergTable.accessTableMetadata().schema();
     List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
 
     if (srcDataFiles.isEmpty()) {
@@ -128,7 +130,7 @@
     List<CopyEntity> copyEntities = getIcebergParitionCopyEntities(targetFs, srcDataFiles, srcWriteDataLocation, destWriteDataLocation, partitionSpec, copyConfig);
     // Adding this check to avoid adding post publish step when there are no files to copy.
     if (CollectionUtils.isNotEmpty(copyEntities)) {
-      copyEntities.add(createOverwritePostPublishStep());
+      copyEntities.add(createOverwritePostPublishStep(srcTableSchema));
     }
 
     log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
@@ -191,9 +193,16 @@
     return new Path(fileDir, newFileName);
   }
 
-  private PostPublishStep createOverwritePostPublishStep() {
+  /**
+   * Creates a {@link PostPublishStep} for overwriting partitions in the destination Iceberg table.
+   * @param srcTableSchema Schema of the source Iceberg table which needs to be passed to the
+   *                       overwrite step for updating destination table schema.
+   * @return a {@link PostPublishStep} that performs the partition overwrite operation.
+   */
+  private PostPublishStep createOverwritePostPublishStep(Schema srcTableSchema) {
     IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep(
         this.getDestIcebergTable().getTableId().toString(),
+        srcTableSchema,
         this.partitionColumnName,
         this.partitionColValue,
         this.properties
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
index 3d3e5c1..2bcc13d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
@@ -53,6 +53,9 @@
     boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
         DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));
 
+    // This method only validates if the schema can be updated, no commit is performed here
+    destIcebergTable.updateSchema(srcIcebergTable.accessTableMetadata().schema(), true);
+
     IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
         srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);
 
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index fab5701..0f6c371 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -36,6 +36,7 @@
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestReader;
 import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
@@ -48,6 +49,7 @@
 import org.apache.iceberg.io.FileIO;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -315,4 +317,31 @@
     log.info("~{}~ SnapshotId after overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId());
   }
 
+  /**
+   * update table's schema to the provided {@link Schema}
+   * @param updatedSchema the updated schema to be set on the table.
+   * @param onlyValidate if true, only validates if the schema is can be updated without committing.
+   * @throws TableNotFoundException if the table does not exist.
+   */
+  protected void updateSchema(Schema updatedSchema, boolean onlyValidate) throws TableNotFoundException {
+    TableMetadata currentTableMetadata = accessTableMetadata();
+    Schema currentSchema = currentTableMetadata.schema();
+
+    if (currentSchema.sameSchema(updatedSchema)) {
+      log.info("~{}~ schema is already up-to-date", tableId);
+      return;
+    }
+
+    log.info("~{}~ updating schema from {} to {}, commit: {}", tableId, currentSchema, updatedSchema, !onlyValidate);
+
+    TableMetadata updatedTableMetadata = currentTableMetadata.updateSchema(updatedSchema, updatedSchema.highestFieldId());
+    Preconditions.checkArgument(updatedTableMetadata.schema().sameSchema(updatedSchema), "Schema mismatch after update, please check destination table");
+
+    if (!onlyValidate) {
+      tableOps.commit(currentTableMetadata, updatedTableMetadata);
+      tableOps.refresh();
+      log.info("~{}~ schema updated successfully", tableId);
+    }
+  }
+
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
index 7c47f2d..90dc90c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
@@ -20,7 +20,6 @@
 import java.io.IOException;
 
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableMetadata;
 
 import lombok.extern.slf4j.Slf4j;
@@ -36,11 +35,7 @@
   }
 
   /**
-   * Compares the metadata of the given two iceberg tables.
-   * <ul>
-   *   <li>First compares the schema of the metadata.</li>
-   *   <li>Then compares the partition spec of the metadata.</li>
-   * </ul>
+   * Compares the partition spec of the metadata.
    * @param tableMetadataA  the metadata of the first table
    * @param tableMetadataB the metadata of the second table
    * @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison
@@ -52,27 +47,6 @@
         tableMetadataA.metadataFileLocation(),
         tableMetadataB.metadataFileLocation());
 
-    Schema schemaA = tableMetadataA.schema();
-    Schema schemaB = tableMetadataB.schema();
-    // TODO: Need to add support for schema evolution
-    //  This check needs to be broken down into multiple checks to support schema evolution
-    //  Possible cases - schemaA == schemaB,
-    //  - schemaA is subset of schemaB [ schemaB Evolved ],
-    //  - schemaA is superset of schemaB [ schemaA Evolved ],
-    //  - Other cases?
-    //  Also consider using Strategy or any other design pattern for this to make it a better solution
-    if (!schemaA.sameSchema(schemaB)) {
-      String errMsg = String.format(
-          "Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}",
-          tableMetadataA.metadataFileLocation(),
-          schemaA.schemaId(),
-          tableMetadataB.metadataFileLocation(),
-          schemaB.schemaId()
-      );
-      log.error(errMsg);
-      throw new IOException(errMsg);
-    }
-
     PartitionSpec partitionSpecA = tableMetadataA.spec();
     PartitionSpec partitionSpecB = tableMetadataB.spec();
     // .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
index e4c1774..634946f 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -24,6 +24,7 @@
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -40,6 +41,7 @@
   private final String testPartitionColName = "testPartition";
   private final String testPartitionColValue = "testValue";
   private IcebergTable mockIcebergTable;
+  private Schema mockSchema;
   private IcebergCatalog mockIcebergCatalog;
   private Properties mockProperties;
   private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
@@ -47,10 +49,11 @@
   @BeforeMethod
   public void setUp() throws IOException {
     mockIcebergTable = Mockito.mock(IcebergTable.class);
+    mockSchema = Mockito.mock(Schema.class);
     mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
     mockProperties = new Properties();
 
-    spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
+    spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
         testPartitionColName, testPartitionColValue, mockProperties));
 
     spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
@@ -112,7 +115,7 @@
     int retryCount = 7;
     mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES,
         Integer.toString(retryCount));
-    spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
+    spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
         testPartitionColName, testPartitionColValue, mockProperties));
 
     spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
index 2cbde2d..4e97186 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
@@ -35,23 +35,6 @@
       .requiredString("field1")
       .requiredString("field2")
       .endRecord());
-  private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2")
-      .fields()
-      .requiredString("field2")
-      .requiredString("field1")
-      .endRecord());
-  private static final Schema schema3 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3")
-      .fields()
-      .requiredString("field1")
-      .requiredString("field2")
-      .requiredInt("field3")
-      .endRecord());
-  private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4")
-      .fields()
-      .requiredInt("field1")
-      .requiredString("field2")
-      .requiredInt("field3")
-      .endRecord());
   private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1)
       .identity("field1")
       .build();
@@ -59,9 +42,6 @@
       schema1, unpartitionedPartitionSpec, "tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>());
   private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 = TableMetadata.newTableMetadata(
       schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", new HashMap<>());
-  private static final TableMetadata tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata(
-      schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>());
-  private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata";
   private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata";
   private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true;
   private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = false;
@@ -75,64 +55,6 @@
   }
 
   @Test
-  public void testValidateDifferentSchemaFails() {
-    // Schema 1 and Schema 2 have different field order
-
-    TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat,
-        unpartitionedPartitionSpec, "tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>());
-
-    verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
-        tableMetadataWithSchema2AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
-  }
-
-  @Test
-  public void testValidateSchemaWithDifferentTypesFails() {
-    // schema 3 and schema 4 have different field types for field1
-
-    TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat,
-        unpartitionedPartitionSpec, "tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>());
-
-    verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
-        tableMetadataWithSchema4AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
-  }
-
-  @Test
-  public void testValidateSchemaWithEvolvedSchemaIFails() {
-    // Schema 3 has one more extra field as compared to Schema 1
-    verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
-        tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
-  }
-
-  @Test
-  public void testValidateSchemaWithEvolvedSchemaIIFails() {
-    // TODO: This test should pass in the future when we support schema evolution
-    // Schema 3 has one more extra field as compared to Schema 1
-    verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
-        tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
-  }
-
-  @Test
-  public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() {
-    // Adding this test as to verify that partition copy doesn't proceed further for this case
-    // as while doing poc and testing had seen final commit gets fail if there is mismatch in field type
-    // specially from int to long
-    Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5")
-        .fields()
-        .requiredLong("field1")
-        .requiredString("field2")
-        .requiredInt("field3")
-        .endRecord());
-    PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4)
-        .identity("field1")
-        .build();
-    TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4,
-        partitionSpec, "tableLocationForSchema5WithPartitionSpec", new HashMap<>());
-
-    verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
-        tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION);
-  }
-
-  @Test
   public void testValidateSamePartitionSpec() throws IOException {
     IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
         tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1,
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 90ba02b..db3bc32 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -46,6 +46,7 @@
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -60,6 +61,8 @@
 import org.apache.iceberg.hive.HiveMetastoreTest;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.apache.iceberg.types.Types;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -105,6 +108,8 @@
   private final String tableName = "justtesting";
   private final String destTableName = "destTable";
   private TableIdentifier tableId;
+  private TableIdentifier sourceTableId;
+  private TableIdentifier destTableId;
   private Table table;
   private String catalogUri;
   private String metadataBasePath;
@@ -126,6 +131,8 @@
   @AfterMethod
   public void cleanUpEachTest() {
     catalog.dropTable(tableId);
+    catalog.dropTable(sourceTableId);
+    catalog.dropTable(destTableId);
   }
 
   /** Test to verify getCurrentSnapshotInfo, getAllSnapshotInfosIterator, getIncrementalSnapshotInfosIterator for iceberg table containing only data files.*/
@@ -154,6 +161,164 @@
     }
   }
 
+  @Test
+  public void schemaUpdateSuccessTest() throws IcebergTable.TableNotFoundException {
+    // create source iceberg table with this schema
+    Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()),
+        Types.NestedField.required(2, "product_name", Types.StringType.get()),
+        Types.NestedField.required(3, "details",
+            Types.StructType.of(Types.NestedField.required(4, "description", Types.StringType.get()),
+                Types.NestedField.required(6, "category", Types.StringType.get()),
+                Types.NestedField.required(7, "remarks", Types.StringType.get()))),
+        Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+    PartitionSpec sourceIcebergPartitionSpec = PartitionSpec.builderFor(sourceIcebergSchema)
+        .identity("id")
+        .build();
+
+    sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+    catalog.createTable(sourceTableId, sourceIcebergSchema, sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    // create destination iceberg table with this schema
+    Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()));
+
+    PartitionSpec destIcebergPartitionSpec = PartitionSpec.builderFor(destIcebergSchema)
+        .identity("id")
+        .build();
+
+    destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+    catalog.createTable(destTableId, destIcebergSchema, destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    TableOperations destTableOps = Mockito.spy(catalog.newTableOps(destTableId));
+    TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, destTableOps, catalogUri, catalog.loadTable(tableId));
+
+    TableMetadata srcTableMetadata = sourceTableOps.current();
+    Schema srcTableSchema = srcTableMetadata.schema();
+
+    // update schema to verify is schema update succeeds
+    destIcebergTable.updateSchema(srcTableSchema, false);
+    Mockito.verify(destTableOps, Mockito.times(1)).commit(Mockito.any(), Mockito.any());
+  }
+
+  @Test
+  public void schemaUpdateTest_divergentSchema() throws IcebergTable.TableNotFoundException {
+    // create source iceberg table with this schema
+    Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()),
+        Types.NestedField.required(2, "product_name", Types.StringType.get()),
+        Types.NestedField.required(3, "details",
+            Types.StructType.of(Types.NestedField.required(4, "description", Types.StringType.get()),
+                Types.NestedField.required(6, "category", Types.StringType.get()),
+                Types.NestedField.required(7, "remarks", Types.StringType.get()))),
+        Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+    PartitionSpec sourceIcebergPartitionSpec = PartitionSpec.builderFor(sourceIcebergSchema)
+        .identity("id")
+        .build();
+
+    TableIdentifier sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+    catalog.createTable(sourceTableId, sourceIcebergSchema, sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    // create destination iceberg table with this schema
+    Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "randomField", Types.LongType.get()));
+
+    PartitionSpec destIcebergPartitionSpec = PartitionSpec.builderFor(destIcebergSchema)
+        .identity("randomField")
+        .build();
+
+    TableIdentifier destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+    catalog.createTable(destTableId, destIcebergSchema, destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    TableOperations destTableOps = Mockito.spy(catalog.newTableOps(destTableId));
+    TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, destTableOps, catalogUri, catalog.loadTable(destTableId));
+
+    TableMetadata srcTableMetadata = sourceTableOps.current();
+    Schema srcTableSchema = srcTableMetadata.schema();
+
+    // update schema to verify is schema update succeeds
+    destIcebergTable.updateSchema(srcTableSchema, false);
+    Mockito.verify(destTableOps, Mockito.times(1)).commit(Mockito.any(), Mockito.any());
+  }
+
+  @Test
+  public void schemaUpdateTest_sameSchema_noOpTest() throws IcebergTable.TableNotFoundException {
+    // create source iceberg table with this schema
+    Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, "randomField", Types.LongType.get()));
+
+    PartitionSpec sourceIcebergPartitionSpec = PartitionSpec.builderFor(sourceIcebergSchema)
+        .identity("randomField")
+        .build();
+
+    TableIdentifier sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+    catalog.createTable(sourceTableId, sourceIcebergSchema, sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    // create destination iceberg table with this schema
+    Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "randomField", Types.LongType.get()));
+
+    PartitionSpec destIcebergPartitionSpec = PartitionSpec.builderFor(destIcebergSchema)
+        .identity("randomField")
+        .build();
+
+    TableIdentifier destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+    catalog.createTable(destTableId, destIcebergSchema, destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    TableOperations destTableOps = Mockito.spy(catalog.newTableOps(destTableId));
+    TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, destTableOps, catalogUri, catalog.loadTable(destTableId));
+
+    TableMetadata srcTableMetadata = sourceTableOps.current();
+    Schema srcTableSchema = srcTableMetadata.schema();
+
+    // update schema to verify is schema update succeeds
+    destIcebergTable.updateSchema(srcTableSchema, false);
+    Mockito.verify(destTableOps, Mockito.times(0)).commit(Mockito.any(), Mockito.any());
+  }
+
+  @Test
+  public void schemaUpdate_onlyValidationTest() throws IcebergTable.TableNotFoundException {
+    // create source iceberg table with this schema
+    Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()),
+        Types.NestedField.required(2, "product_name", Types.StringType.get()),
+        Types.NestedField.required(3, "details",
+            Types.StructType.of(Types.NestedField.required(4, "description", Types.StringType.get()),
+                Types.NestedField.required(6, "category", Types.StringType.get()),
+                Types.NestedField.required(7, "remarks", Types.StringType.get()))),
+        Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+    PartitionSpec sourceIcebergPartitionSpec = PartitionSpec.builderFor(sourceIcebergSchema)
+        .identity("id")
+        .build();
+
+    sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+    catalog.createTable(sourceTableId, sourceIcebergSchema, sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    // create destination iceberg table with this schema
+    Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()));
+
+    PartitionSpec destIcebergPartitionSpec = PartitionSpec.builderFor(destIcebergSchema)
+        .identity("id")
+        .build();
+
+    destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+    catalog.createTable(destTableId, destIcebergSchema, destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    TableOperations destTableOps = Mockito.spy(catalog.newTableOps(destTableId));
+    TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, destTableOps, catalogUri, catalog.loadTable(tableId));
+
+    TableMetadata srcTableMetadata = sourceTableOps.current();
+    Schema srcTableSchema = srcTableMetadata.schema();
+
+    // update schema to verify is schema update succeeds
+    destIcebergTable.updateSchema(srcTableSchema, true);
+    Mockito.verify(destTableOps, Mockito.times(0)).commit(Mockito.any(), Mockito.any());
+  }
+
   @DataProvider(name = "isPosDeleteProvider")
   public Object[][] isPosDeleteProvider() {
     return new Object[][] {{true}, {false}};