[750] Fix schema sync for Iceberg tables (#749)
* fix initial schema sync
* update sync schema
* spotless
* update run sync to hit field ID case
* add basic unit test, fix test expectation
* style
diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index a57ac4f..b05089d 100644
--- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -164,7 +164,22 @@
   @Override
   public void syncSchema(InternalSchema schema) {
     Schema latestSchema = schemaExtractor.toIceberg(schema);
-    schemaSync.sync(transaction.table().schema(), latestSchema, transaction);
+    if (!transaction.table().schema().sameSchema(latestSchema)) {
+      boolean hasFieldIds =
+          schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null);
+      if (hasFieldIds) {
+        // There is no clean way to sync the schema with the provided field IDs using the
+        // transaction API so we commit the current transaction and interact directly with
+        // the operations API.
+        transaction.commitTransaction();
+        schemaSync.syncWithProvidedIds(latestSchema, table);
+        // Start a new transaction for remaining operations
+        table.refresh();
+        transaction = table.newTransaction();
+      } else {
+        schemaSync.sync(transaction.table().schema(), latestSchema, transaction);
+      }
+    }
   }
 
   @Override
diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
index 800938c..4b57056 100644
--- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
+++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
@@ -28,7 +28,10 @@
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.types.Types;
@@ -58,6 +61,14 @@
     }
   }
 
+  public void syncWithProvidedIds(Schema latest, Table table) {
+    BaseTable baseTable = ((BaseTable) table);
+    TableMetadata current = baseTable.operations().current();
+    TableMetadata updated =
+        TableMetadata.buildFrom(current).setCurrentSchema(latest, latest.highestFieldId()).build();
+    baseTable.operations().commit(current, updated);
+  }
+
   /**
    * Return a mapping of fieldId in the latest schema to an update action to perform. This allows
    * updates to happen in the same order as the source system.
diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
index 431184c..06b625c 100644
--- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
+++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
@@ -29,11 +29,13 @@
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -75,24 +77,34 @@
       return getTable(catalogConfig, tableIdentifier, basePath);
     } else {
       try {
-        return getCatalog(catalogConfig)
-            .map(
-                catalog ->
-                    catalog.createTable(
-                        tableIdentifier,
-                        schema,
-                        partitionSpec,
-                        basePath,
-                        getDefaultMappingProperties(schema)))
-            .orElseGet(
-                () ->
-                    getHadoopTables()
-                        .create(
-                            schema,
-                            partitionSpec,
-                            SortOrder.unsorted(),
-                            getDefaultMappingProperties(schema),
-                            basePath));
+        // initialize the table with an empty schema, then manually set the schema to prevent the
+        // Iceberg API from remapping the field IDs.
+        Table tableWithEmptySchema =
+            getCatalog(catalogConfig)
+                .map(
+                    catalog ->
+                        catalog.createTable(
+                            tableIdentifier,
+                            new Schema(),
+                            PartitionSpec.unpartitioned(),
+                            basePath,
+                            getDefaultMappingProperties(schema)))
+                .orElseGet(
+                    () ->
+                        getHadoopTables()
+                            .create(
+                                new Schema(),
+                                PartitionSpec.unpartitioned(),
+                                getDefaultMappingProperties(schema),
+                                basePath));
+        // set the schema with the provided field IDs
+        TableOperations operations = ((BaseTable) tableWithEmptySchema).operations();
+        TableMetadata tableMetadata = operations.current();
+        TableMetadata.Builder builder = TableMetadata.buildFrom(tableMetadata);
+        builder.setCurrentSchema(schema, schema.highestFieldId());
+        builder.setDefaultPartitionSpec(partitionSpec);
+        operations.commit(tableMetadata, builder.build());
+        return getTable(catalogConfig, tableIdentifier, basePath);
       } catch (AlreadyExistsException ex) {
         log.info("Table {} not created since it already exists", tableIdentifier);
         return getTable(catalogConfig, tableIdentifier, basePath);
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
index 89460c4..8295ce5 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
@@ -590,7 +590,10 @@
 
   @SneakyThrows
   protected HoodieTableMetaClient getMetaClient(
-      TypedProperties keyGenProperties, HoodieTableType hoodieTableType, Configuration conf) {
+      TypedProperties keyGenProperties,
+      HoodieTableType hoodieTableType,
+      Configuration conf,
+      boolean populateMetaFields) {
     LocalFileSystem fs = (LocalFileSystem) FSUtils.getFs(basePath, conf);
     // Enforce checksum such that fs.open() is consistent to DFS
     fs.setVerifyChecksum(true);
@@ -614,6 +617,7 @@
             .setPayloadClass(OverwriteWithLatestAvroPayload.class)
             .setCommitTimezone(HoodieTimelineTimeZone.UTC)
             .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+            .setPopulateMetaFields(populateMetaFields)
             .build();
     return HoodieTableMetaClient.initTableAndGetMetaClient(conf, this.basePath, properties);
   }
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
index abbe7fe..2f5b73e 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
@@ -66,6 +66,7 @@
   private HoodieJavaWriteClient<HoodieAvroPayload> writeClient;
 
   private final Configuration conf;
+  private final boolean addFieldIds;
 
   /**
    * Create a test table instance for general testing. The table is created with the schema defined
@@ -83,7 +84,13 @@
   public static TestJavaHudiTable forStandardSchema(
       String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) {
     return new TestJavaHudiTable(
-        tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null);
+        tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null, false);
+  }
+
+  public static TestJavaHudiTable forStandardSchemaWithFieldIds(
+      String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) {
+    return new TestJavaHudiTable(
+        tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null, true);
   }
 
   public static TestJavaHudiTable forStandardSchema(
@@ -93,7 +100,7 @@
       HoodieTableType tableType,
       HoodieArchivalConfig archivalConfig) {
     return new TestJavaHudiTable(
-        tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, archivalConfig);
+        tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, archivalConfig, false);
   }
 
   /**
@@ -119,7 +126,20 @@
         tempDir,
         partitionConfig,
         tableType,
-        null);
+        null,
+        false);
+  }
+
+  public static TestJavaHudiTable withAdditionalColumnsAndFieldIds(
+      String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) {
+    return new TestJavaHudiTable(
+        tableName,
+        addSchemaEvolutionFieldsToBase(BASIC_SCHEMA),
+        tempDir,
+        partitionConfig,
+        tableType,
+        null,
+        true);
   }
 
   public static TestJavaHudiTable withAdditionalTopLevelField(
@@ -129,7 +149,13 @@
       HoodieTableType tableType,
       Schema previousSchema) {
     return new TestJavaHudiTable(
-        tableName, addTopLevelField(previousSchema), tempDir, partitionConfig, tableType, null);
+        tableName,
+        addTopLevelField(previousSchema),
+        tempDir,
+        partitionConfig,
+        tableType,
+        null,
+        false);
   }
 
   public static TestJavaHudiTable withSchema(
@@ -138,7 +164,8 @@
       String partitionConfig,
       HoodieTableType tableType,
       Schema schema) {
-    return new TestJavaHudiTable(tableName, schema, tempDir, partitionConfig, tableType, null);
+    return new TestJavaHudiTable(
+        tableName, schema, tempDir, partitionConfig, tableType, null, false);
   }
 
   private TestJavaHudiTable(
@@ -147,10 +174,12 @@
       Path tempDir,
       String partitionConfig,
       HoodieTableType hoodieTableType,
-      HoodieArchivalConfig archivalConfig) {
+      HoodieArchivalConfig archivalConfig,
+      boolean addFieldIds) {
     super(name, schema, tempDir, partitionConfig);
     this.conf = new Configuration();
     this.conf.set("parquet.avro.write-old-list-structure", "false");
+    this.addFieldIds = addFieldIds;
     try {
       this.metaClient = initMetaClient(hoodieTableType, typedProperties);
     } catch (IOException ex) {
@@ -297,13 +326,14 @@
 
   private HoodieTableMetaClient initMetaClient(
       HoodieTableType hoodieTableType, TypedProperties keyGenProperties) throws IOException {
-    return getMetaClient(keyGenProperties, hoodieTableType, conf);
+    return getMetaClient(keyGenProperties, hoodieTableType, conf, !addFieldIds);
   }
 
   private HoodieJavaWriteClient<HoodieAvroPayload> initJavaWriteClient(
       Schema schema, TypedProperties keyGenProperties, HoodieArchivalConfig archivalConfig) {
     HoodieWriteConfig writeConfig =
         HoodieWriteConfig.newBuilder()
+            .withPopulateMetaFields(!addFieldIds)
             .withProperties(generateWriteConfig(schema, keyGenProperties).getProps())
             .withClusteringConfig(
                 HoodieClusteringConfig.newBuilder()
@@ -321,6 +351,18 @@
               .withArchivalConfig(archivalConfig)
               .build();
     }
+    if (addFieldIds) {
+      writeConfig
+          .getProps()
+          .put(
+              "hoodie.avro.write.support.class",
+              "org.apache.xtable.hudi.extensions.HoodieAvroWriteSupportWithFieldIds");
+      writeConfig
+          .getProps()
+          .put(
+              "hoodie.client.init.callback.classes",
+              "org.apache.xtable.hudi.extensions.AddFieldIdsClientInitCallback");
+    }
     HoodieEngineContext context = new HoodieJavaEngineContext(conf);
     return new HoodieJavaWriteClient<>(context, writeConfig);
   }
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java
index 79316f5..1aaf61f 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java
@@ -271,6 +271,6 @@
 
   private HoodieTableMetaClient initMetaClient(
       JavaSparkContext jsc, HoodieTableType hoodieTableType, TypedProperties keyGenProperties) {
-    return getMetaClient(keyGenProperties, hoodieTableType, jsc.hadoopConfiguration());
+    return getMetaClient(keyGenProperties, hoodieTableType, jsc.hadoopConfiguration(), true);
   }
 }
diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
index ab13ae2..ffe6a21 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
@@ -364,7 +364,7 @@
             .schema(csSchema)
             .createWriterFunc(GenericParquetWriter::buildWriter)
             .overwrite()
-            .withSpec(csPartitionSpec)
+            .withSpec(table.spec())
             .withPartition(partitionInfo)
             .build();
 
diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
index 9825459..b07fac4 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
@@ -18,6 +18,7 @@
  
 package org.apache.xtable.iceberg;
 
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -29,9 +30,12 @@
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.InOrder;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.types.Type;
@@ -335,6 +339,25 @@
     verify(mockUpdateSchema).commit();
   }
 
+  @Test
+  void testSyncWithProvidedIds() {
+    BaseTable mockBaseTable = Mockito.mock(BaseTable.class, RETURNS_DEEP_STUBS);
+    TableMetadata mockCurrent = Mockito.mock(TableMetadata.class);
+    when(mockBaseTable.operations().current()).thenReturn(mockCurrent);
+    try (MockedStatic<TableMetadata> tableMetadataMockedStatic =
+        Mockito.mockStatic(TableMetadata.class)) {
+      TableMetadata.Builder mockBuilder = Mockito.mock(TableMetadata.Builder.class);
+      tableMetadataMockedStatic
+          .when(() -> TableMetadata.buildFrom(mockCurrent))
+          .thenReturn(mockBuilder);
+      when(mockBuilder.setCurrentSchema(SCHEMA, SCHEMA.highestFieldId())).thenReturn(mockBuilder);
+      TableMetadata mockUpdated = Mockito.mock(TableMetadata.class);
+      when(mockBuilder.build()).thenReturn(mockUpdated);
+      schemaSync.syncWithProvidedIds(SCHEMA, mockBaseTable);
+      verify(mockBaseTable.operations()).commit(mockCurrent, mockUpdated);
+    }
+  }
+
   private Schema addColumnToDefault(Schema schema, Types.NestedField field, Integer parentId) {
     List<Types.NestedField> fields = new ArrayList<>();
     for (Types.NestedField existingField : schema.columns()) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index c02d7f2..d5a25b0 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -170,9 +170,9 @@
           .build();
   private final Schema icebergSchema =
       new Schema(
-          Types.NestedField.required(1, "timestamp_field", Types.TimestampType.withoutZone()),
+          Types.NestedField.required(3, "timestamp_field", Types.TimestampType.withoutZone()),
           Types.NestedField.required(2, "date_field", Types.DateType.get()),
-          Types.NestedField.required(3, "group_id", Types.IntegerType.get()),
+          Types.NestedField.required(1, "group_id", Types.IntegerType.get()),
           Types.NestedField.required(
               4,
               "record",
@@ -244,11 +244,13 @@
 
     TableFormatSync.getInstance()
         .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
-    validateIcebergTable(tableName, table1, Sets.newHashSet(dataFile1, dataFile2), null);
+    validateIcebergTable(
+        tableName, table1, Sets.newHashSet(dataFile1, dataFile2), null, icebergSchema);
 
     TableFormatSync.getInstance()
         .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
-    validateIcebergTable(tableName, table2, Sets.newHashSet(dataFile2, dataFile3), null);
+    validateIcebergTable(
+        tableName, table2, Sets.newHashSet(dataFile2, dataFile3), null, icebergSchema);
 
     ArgumentCaptor<Transaction> transactionArgumentCaptor =
         ArgumentCaptor.forClass(Transaction.class);
@@ -256,7 +258,7 @@
     ArgumentCaptor<PartitionSpec> partitionSpecArgumentCaptor =
         ArgumentCaptor.forClass(PartitionSpec.class);
 
-    verify(mockSchemaSync, times(2))
+    verify(mockSchemaSync, times(1))
         .sync(
             schemaArgumentCaptor.capture(),
             schemaArgumentCaptor.capture(),
@@ -274,13 +276,9 @@
     assertTrue(
         partitionSpecSchemaArgumentCaptor.getAllValues().stream()
             .allMatch(capturedSchema -> capturedSchema.sameSchema(icebergSchema)));
-    // schema sync args for first iteration
-    assertTrue(
-        schemaArgumentCaptor.getAllValues().subList(0, 2).stream()
-            .allMatch(capturedSchema -> capturedSchema.sameSchema(icebergSchema)));
     // second snapshot sync will evolve the schema
-    assertTrue(schemaArgumentCaptor.getAllValues().get(2).sameSchema(icebergSchema));
-    assertTrue(schemaArgumentCaptor.getAllValues().get(3).sameSchema(icebergSchema2));
+    assertTrue(schemaArgumentCaptor.getAllValues().get(0).sameSchema(icebergSchema));
+    assertTrue(schemaArgumentCaptor.getAllValues().get(1).sameSchema(icebergSchema2));
     // check that the correct partition spec is used in calls to the mocks
     assertTrue(
         partitionSpecArgumentCaptor.getAllValues().stream()
@@ -292,9 +290,6 @@
     assertSame(
         transactionArgumentCaptor.getAllValues().get(0),
         transactionArgumentCaptor.getAllValues().get(2));
-    assertSame(
-        transactionArgumentCaptor.getAllValues().get(1),
-        transactionArgumentCaptor.getAllValues().get(3));
     // validate that transactions are different between runs
     assertNotSame(
         transactionArgumentCaptor.getAllValues().get(1),
@@ -358,7 +353,8 @@
     // get a new iceberg sync to make sure table is re-read from disk and no metadata is cached
     TableFormatSync.getInstance()
         .syncSnapshot(Collections.singletonList(conversionTarget), snapshot3);
-    validateIcebergTable(tableName, table2, Sets.newHashSet(dataFile3, dataFile4), null);
+    validateIcebergTable(
+        tableName, table2, Sets.newHashSet(dataFile3, dataFile4), null, icebergSchema);
     // Validate Iceberg table state
     Table table = getTable(basePath);
     assertEquals(4, table.history().size());
@@ -425,7 +421,8 @@
         Expressions.and(
             Expressions.greaterThanOrEqual(
                 partitionField.getSourceField().getName(), "2022-10-01T00:00"),
-            Expressions.lessThan(partitionField.getSourceField().getName(), "2022-10-02T00:00")));
+            Expressions.lessThan(partitionField.getSourceField().getName(), "2022-10-02T00:00")),
+        icebergSchema);
   }
 
   @Test
@@ -485,7 +482,8 @@
         Sets.newHashSet(dataFile1, dataFile2),
         Expressions.and(
             Expressions.greaterThanOrEqual(partitionField.getSourceField().getName(), "2022-10-01"),
-            Expressions.lessThan(partitionField.getSourceField().getName(), "2022-10-02")));
+            Expressions.lessThan(partitionField.getSourceField().getName(), "2022-10-02")),
+        icebergSchema);
   }
 
   @Test
@@ -539,7 +537,8 @@
         Sets.newHashSet(dataFile1, dataFile2),
         Expressions.and(
             Expressions.greaterThanOrEqual(partitionField.getSourceField().getName(), 1),
-            Expressions.lessThan(partitionField.getSourceField().getName(), 2)));
+            Expressions.lessThan(partitionField.getSourceField().getName(), 2)),
+        icebergSchema);
   }
 
   @Test
@@ -619,7 +618,8 @@
                 Expressions.greaterThanOrEqual(
                     partitionField2.getSourceField().getName(), "2022-10-01T00:00"),
                 Expressions.lessThan(
-                    partitionField2.getSourceField().getName(), "2022-10-02T00:00"))));
+                    partitionField2.getSourceField().getName(), "2022-10-02T00:00"))),
+        icebergSchema);
   }
 
   @Test
@@ -678,7 +678,8 @@
         tableName,
         table,
         Sets.newHashSet(dataFile1, dataFile2),
-        Expressions.equal(partitionField.getSourceField().getPath(), "value1"));
+        Expressions.equal(partitionField.getSourceField().getPath(), "value1"),
+        icebergSchema);
   }
 
   @Test
@@ -822,13 +823,16 @@
       String tableName,
       InternalTable table,
       Set<InternalDataFile> expectedFiles,
-      Expression filterExpression)
+      Expression filterExpression,
+      Schema expectedSchema)
       throws IOException {
     Path warehouseLocation = Paths.get(table.getBasePath()).getParent();
     try (HadoopCatalog catalog = new HadoopCatalog(CONFIGURATION, warehouseLocation.toString())) {
       TableIdentifier tableId = TableIdentifier.of(Namespace.empty(), tableName);
       assertTrue(catalog.tableExists(tableId));
-      TableScan scan = catalog.loadTable(tableId).newScan();
+      Table icebergTable = catalog.loadTable(tableId);
+      assertTrue(expectedSchema.sameSchema(icebergTable.schema()));
+      TableScan scan = icebergTable.newScan();
       if (filterExpression != null) {
         scan = scan.filter(filterExpression);
       }
diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
index f81f133..f424e3a 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
@@ -20,7 +20,9 @@
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -30,10 +32,14 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
 
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -101,27 +107,46 @@
             .catalogName(catalogName)
             .catalogOptions(OPTIONS)
             .build();
-    Table mockTable = mock(Table.class);
+    BaseTable mockInitialTable = mock(BaseTable.class);
+    Table loadedTable = mock(Table.class);
     when(mockCatalog.tableExists(IDENTIFIER)).thenReturn(false);
     Schema schema = new Schema();
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     when(mockCatalog.createTable(
-            IDENTIFIER,
-            schema,
-            partitionSpec,
-            BASE_PATH,
-            Collections.singletonMap(
-                TableProperties.DEFAULT_NAME_MAPPING,
-                NameMappingParser.toJson(MappingUtil.create(schema)))))
-        .thenReturn(mockTable);
+            eq(IDENTIFIER),
+            any(),
+            eq(PartitionSpec.unpartitioned()),
+            eq(BASE_PATH),
+            eq(
+                Collections.singletonMap(
+                    TableProperties.DEFAULT_NAME_MAPPING,
+                    NameMappingParser.toJson(MappingUtil.create(schema))))))
+        .thenReturn(mockInitialTable);
+    when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(loadedTable);
 
-    IcebergTableManager tableManager = IcebergTableManager.of(CONFIGURATION);
+    TableOperations tableOperations = mock(TableOperations.class);
+    when(mockInitialTable.operations()).thenReturn(tableOperations);
+    TableMetadata initialMetadata = mock(TableMetadata.class);
+    when(tableOperations.current()).thenReturn(initialMetadata);
+    try (MockedStatic<TableMetadata> tableMetadataMockedStatic = mockStatic(TableMetadata.class)) {
+      TableMetadata.Builder mockBuilder = mock(TableMetadata.Builder.class);
+      tableMetadataMockedStatic
+          .when(() -> TableMetadata.buildFrom(initialMetadata))
+          .thenReturn(mockBuilder);
+      TableMetadata updatedMetadata = mock(TableMetadata.class);
+      when(mockBuilder.build()).thenReturn(updatedMetadata);
 
-    Table actual =
-        tableManager.getOrCreateTable(catalogConfig, IDENTIFIER, BASE_PATH, schema, partitionSpec);
-    assertEquals(mockTable, actual);
-    verify(mockCatalog).initialize(catalogName, OPTIONS);
-    verify(mockCatalog, never()).loadTable(any());
+      IcebergTableManager tableManager = IcebergTableManager.of(CONFIGURATION);
+
+      Table actual =
+          tableManager.getOrCreateTable(
+              catalogConfig, IDENTIFIER, BASE_PATH, schema, partitionSpec);
+      assertEquals(loadedTable, actual);
+      verify(mockCatalog).initialize(catalogName, OPTIONS);
+      verify(tableOperations).commit(initialMetadata, updatedMetadata);
+      verify(mockBuilder).setCurrentSchema(schema, schema.highestFieldId());
+      verify(mockBuilder).setDefaultPartitionSpec(partitionSpec);
+    }
   }
 
   @Test
@@ -139,13 +164,14 @@
     Schema schema = new Schema();
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     when(mockCatalog.createTable(
-            IDENTIFIER,
-            schema,
-            partitionSpec,
-            BASE_PATH,
-            Collections.singletonMap(
-                TableProperties.DEFAULT_NAME_MAPPING,
-                NameMappingParser.toJson(MappingUtil.create(schema)))))
+            eq(IDENTIFIER),
+            any(),
+            any(),
+            eq(BASE_PATH),
+            eq(
+                Collections.singletonMap(
+                    TableProperties.DEFAULT_NAME_MAPPING,
+                    NameMappingParser.toJson(MappingUtil.create(schema))))))
         .thenThrow(new AlreadyExistsException("Table already exists"));
     when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(mockTable);
 
diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml
index 5e37be6..b0f09a3 100644
--- a/xtable-utilities/pom.xml
+++ b/xtable-utilities/pom.xml
@@ -43,6 +43,12 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.xtable</groupId>
+            <artifactId>xtable-hudi-support-extensions_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.xtable</groupId>
diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
index 52cff85..24aa5ad 100644
--- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
+++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
@@ -133,7 +133,7 @@
     Path icebergMetadataPath = Paths.get(URI.create(basePath + "/metadata"));
     long icebergMetadataFiles =
         Files.list(icebergMetadataPath).filter(p -> p.toString().endsWith("metadata.json")).count();
-    Assertions.assertEquals(2, icebergMetadataFiles);
+    Assertions.assertEquals(3, icebergMetadataFiles);
     Path deltaMetadataPath = Paths.get(URI.create(basePath + "/_delta_log"));
     long deltaMetadataFiles =
         Files.list(deltaMetadataPath).filter(p -> p.toString().endsWith(".json")).count();
diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
index 2294e16..f18ce86 100644
--- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
+++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
@@ -55,7 +55,7 @@
       String[] args = new String[] {"--datasetConfig", configFile.getPath()};
       RunSync.main(args);
       Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata"));
-      waitForNumIcebergCommits(icebergMetadataPath, 2);
+      waitForNumIcebergCommits(icebergMetadataPath, 3);
     }
   }
 
@@ -64,7 +64,7 @@
     ExecutorService runner = Executors.newSingleThreadExecutor();
     String tableName = "test-table";
     try (GenericTable table =
-        TestJavaHudiTable.forStandardSchema(
+        TestJavaHudiTable.forStandardSchemaWithFieldIds(
             tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
       table.insertRows(20);
       File configFile = writeConfigFile(tempDir, table, tableName);
@@ -78,11 +78,16 @@
             }
           });
       Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata"));
-      waitForNumIcebergCommits(icebergMetadataPath, 2);
+      waitForNumIcebergCommits(icebergMetadataPath, 3);
+    }
+    try (GenericTable table =
+        TestJavaHudiTable.withAdditionalColumnsAndFieldIds(
+            tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
       // write more data now that table is initialized and data is synced
       table.insertRows(20);
-      waitForNumIcebergCommits(icebergMetadataPath, 3);
-      assertEquals(3, numIcebergMetadataJsonFiles(icebergMetadataPath));
+      Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata"));
+      waitForNumIcebergCommits(icebergMetadataPath, 6);
+      assertEquals(6, numIcebergMetadataJsonFiles(icebergMetadataPath));
     } finally {
       runner.shutdownNow();
     }