Changes'
diff --git a/core/src/test/java/io/onetable/GenericTable.java b/core/src/test/java/io/onetable/GenericTable.java
index 3eda7c4..0e7fa56 100644
--- a/core/src/test/java/io/onetable/GenericTable.java
+++ b/core/src/test/java/io/onetable/GenericTable.java
@@ -125,6 +125,39 @@
     }
   }
 
+  static GenericTable getInstanceWithCommonSchema(
+      String tableName,
+      Path tempDir,
+      SparkSession sparkSession,
+      JavaSparkContext jsc,
+      TableFormat sourceFormat) {
+    switch (sourceFormat) {
+      case HUDI:
+        return TestSparkHudiTable.forGivenSchemaAndPartitioning(
+            tableName,
+            tempDir,
+            jsc,
+            TestCommonTableHelper.getCommonSchemaInAvroFormat(),
+            TestCommonTableHelper.getHudiPartitionConfig());
+      case DELTA:
+        return TestSparkDeltaTable.forGivenSchemaAndPartitioning(
+            tableName,
+            tempDir,
+            sparkSession,
+            TestCommonTableHelper.getCommonSchemaInStructFormat(),
+            TestCommonTableHelper.getDeltaPartitionField());
+      case ICEBERG:
+        return TestIcebergTable.forGivenSchemaAndPartitioning(
+            tableName,
+            tempDir,
+            jsc.hadoopConfiguration(),
+            TestCommonTableHelper.getCommonSchemaInIcebergFormat(),
+            TestCommonTableHelper.getIcebergPartitionField());
+      default:
+        throw new IllegalArgumentException("Unsupported source format: " + sourceFormat);
+    }
+  }
+
   static String getTableName() {
     return "test_table_" + UUID.randomUUID().toString().replaceAll("-", "_");
   }
diff --git a/core/src/test/java/io/onetable/ITOneTableClient.java b/core/src/test/java/io/onetable/ITOneTableClient.java
index 7609cf8..0ca9d97 100644
--- a/core/src/test/java/io/onetable/ITOneTableClient.java
+++ b/core/src/test/java/io/onetable/ITOneTableClient.java
@@ -670,6 +670,148 @@
     }
   }
 
+  @Test
+  public void roundTripTesting() {
+    SourceClientProvider<?> hudiSourceClientProvider = getSourceClientProvider(TableFormat.HUDI);
+    SourceClientProvider<?> icebergSourceClientProvider =
+        getSourceClientProvider(TableFormat.ICEBERG);
+    SourceClientProvider<?> deltaSourceClientProvider = getSourceClientProvider(TableFormat.DELTA);
+    String onetablePartitionConfig = "level:VALUE";
+    SyncMode syncMode = SyncMode.INCREMENTAL;
+
+    // Create table in hudi and sync to iceberg and delta.
+    String tableName = getTableName();
+    OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration());
+    List<?> insertRecords;
+    try (GenericTable table =
+        GenericTable.getInstanceWithCommonSchema(
+            tableName, tempDir, sparkSession, jsc, TableFormat.HUDI)) {
+      insertRecords = table.insertRows(100);
+
+      PerTableConfig perTableConfig =
+          PerTableConfig.builder()
+              .tableName(tableName)
+              .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA))
+              .tableBasePath(table.getBasePath())
+              .tableDataPath(table.getDataPath())
+              .hudiSourceConfig(
+                  HudiSourceConfig.builder()
+                      .partitionFieldSpecConfig(onetablePartitionConfig)
+                      .build())
+              .syncMode(syncMode)
+              .build();
+      oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 100);
+
+      // make multiple commits and then sync
+      table.insertRows(100);
+      table.upsertRows(insertRecords.subList(0, 20));
+      oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.HUDI, table, Arrays.asList(TableFormat.DELTA, TableFormat.ICEBERG), 200);
+    }
+
+    // Now Get the table as Delta and do Sync to Hudi and Iceberg.
+    try (GenericTable table =
+        GenericTable.getInstanceWithCommonSchema(
+            tableName, tempDir, sparkSession, jsc, TableFormat.DELTA)) {
+      PerTableConfig perTableConfig =
+          PerTableConfig.builder()
+              .tableName(tableName)
+              .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG))
+              .tableBasePath(table.getBasePath())
+              .tableDataPath(table.getDataPath())
+              .hudiSourceConfig(
+                  HudiSourceConfig.builder()
+                      .partitionFieldSpecConfig(onetablePartitionConfig)
+                      .build())
+              .syncMode(syncMode)
+              .build();
+
+      table.insertRows(100);
+      oneTableClient.sync(perTableConfig, deltaSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.DELTA, table, Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG), 300);
+
+      table.insertRows(100);
+      oneTableClient.sync(perTableConfig, deltaSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.DELTA, table, Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG), 400);
+
+      table.upsertRows(insertRecords.subList(0, 20));
+      oneTableClient.sync(perTableConfig, deltaSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.DELTA, table, Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG), 400);
+    }
+
+    // Get table as Iceberg and do Sync to Hudi and Delta.
+    try (GenericTable table =
+        GenericTable.getInstanceWithCommonSchema(
+            tableName, tempDir, sparkSession, jsc, TableFormat.ICEBERG)) {
+      PerTableConfig perTableConfig =
+          PerTableConfig.builder()
+              .tableName(tableName)
+              .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA))
+              .tableBasePath(table.getBasePath())
+              .tableDataPath(table.getDataPath())
+              .hudiSourceConfig(
+                  HudiSourceConfig.builder()
+                      .partitionFieldSpecConfig(onetablePartitionConfig)
+                      .build())
+              .syncMode(syncMode)
+              .build();
+
+      table.insertRows(100);
+      oneTableClient.sync(perTableConfig, icebergSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.ICEBERG, table, Arrays.asList(TableFormat.HUDI, TableFormat.DELTA), 500);
+
+      table.insertRows(100);
+      oneTableClient.sync(perTableConfig, icebergSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.ICEBERG, table, Arrays.asList(TableFormat.HUDI, TableFormat.DELTA), 600);
+
+      table.upsertRows(insertRecords.subList(0, 20));
+      oneTableClient.sync(perTableConfig, icebergSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.ICEBERG, table, Arrays.asList(TableFormat.HUDI, TableFormat.DELTA), 600);
+    }
+
+    // Get one last time, table as Hudi and do Sync to Iceberg and Delta.
+    try (GenericTable table =
+        GenericTable.getInstanceWithCommonSchema(
+            tableName, tempDir, sparkSession, jsc, TableFormat.HUDI)) {
+      PerTableConfig perTableConfig =
+          PerTableConfig.builder()
+              .tableName(tableName)
+              .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA))
+              .tableBasePath(table.getBasePath())
+              .tableDataPath(table.getDataPath())
+              .hudiSourceConfig(
+                  HudiSourceConfig.builder()
+                      .partitionFieldSpecConfig(onetablePartitionConfig)
+                      .build())
+              .syncMode(syncMode)
+              .build();
+
+      table.insertRows(100);
+      oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 700);
+
+      table.insertRows(100);
+      oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 800);
+
+      table.upsertRows(insertRecords.subList(0, 20));
+      oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+      checkDatasetEquivalence(
+          TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 800);
+    }
+  }
+
   private Map<String, String> getTimeTravelOption(TableFormat tableFormat, Instant time) {
     Map<String, String> options = new HashMap<>();
     switch (tableFormat) {
diff --git a/core/src/test/java/io/onetable/TestAbstractHudiTable.java b/core/src/test/java/io/onetable/TestAbstractHudiTable.java
index 8fe5c1b..7e2fce6 100644
--- a/core/src/test/java/io/onetable/TestAbstractHudiTable.java
+++ b/core/src/test/java/io/onetable/TestAbstractHudiTable.java
@@ -761,6 +761,6 @@
 
   @Override
   public String getOrderByColumn() {
-    return "_hoodie_record_key";
+    return RECORD_KEY_FIELD_NAME;
   }
 }
diff --git a/core/src/test/java/io/onetable/TestCommonTableHelper.java b/core/src/test/java/io/onetable/TestCommonTableHelper.java
new file mode 100644
index 0000000..beb76a9
--- /dev/null
+++ b/core/src/test/java/io/onetable/TestCommonTableHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package io.onetable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.types.StructType;
+
+import io.onetable.avro.AvroSchemaConverter;
+import io.onetable.delta.DeltaSchemaExtractor;
+import io.onetable.iceberg.IcebergSchemaExtractor;
+import io.onetable.model.schema.OneSchema;
+
+public class TestCommonTableHelper {
+  private static Schema COMMON_SCHEMA;
+  private static OneSchema COMMON_ONE_SCHEMA;
+
+  static {
+    try (InputStream inputStream =
+        GenericTable.class.getClassLoader().getResourceAsStream("schemas/common_schema.avsc")) {
+      COMMON_SCHEMA = new Schema.Parser().parse(inputStream);
+      COMMON_ONE_SCHEMA = AvroSchemaConverter.getInstance().toOneSchema(COMMON_SCHEMA);
+    } catch (IOException ex) {
+      throw new UncheckedIOException(ex);
+    }
+  }
+
+  public static Schema getCommonSchemaInAvroFormat() {
+    return COMMON_SCHEMA;
+  }
+
+  public static StructType getCommonSchemaInStructFormat() {
+    return DeltaSchemaExtractor.getInstance().fromOneSchema(COMMON_ONE_SCHEMA);
+  }
+
+  public static org.apache.iceberg.Schema getCommonSchemaInIcebergFormat() {
+    return IcebergSchemaExtractor.getInstance().toIceberg(COMMON_ONE_SCHEMA);
+  }
+
+  public static String getHudiPartitionConfig() {
+    return "level:SIMPLE";
+  }
+
+  public static String getDeltaPartitionField() {
+    return "level";
+  }
+
+  public static String getIcebergPartitionField() {
+    return "level";
+  }
+}
diff --git a/core/src/test/java/io/onetable/TestIcebergTable.java b/core/src/test/java/io/onetable/TestIcebergTable.java
index ada0880..3d5bf0c 100644
--- a/core/src/test/java/io/onetable/TestIcebergTable.java
+++ b/core/src/test/java/io/onetable/TestIcebergTable.java
@@ -18,6 +18,7 @@
  
 package io.onetable;
 
+import static io.onetable.iceberg.TestIcebergDataHelper.DEFAULT_RECORD_KEY_FIELD;
 import static io.onetable.iceberg.TestIcebergDataHelper.createIcebergDataHelper;
 import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP;
 import static org.junit.jupiter.api.Assertions.*;
@@ -31,6 +32,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -70,7 +72,6 @@
 
 @Getter
 public class TestIcebergTable implements GenericTable<Record, String> {
-  private static final String DEFAULT_RECORD_KEY_FIELD = "id";
   private static final List<String> DEFAULT_PARTITION_FIELDS = Collections.singletonList("level");
 
   private final String tableName;
@@ -87,7 +88,7 @@
         tableName,
         tempDir,
         hadoopConf,
-        DEFAULT_RECORD_KEY_FIELD,
+        Optional.empty(),
         Collections.singletonList(partitionField),
         false);
   }
@@ -98,23 +99,38 @@
         tableName,
         tempDir,
         hadoopConf,
-        DEFAULT_RECORD_KEY_FIELD,
+        Optional.empty(),
         Collections.singletonList(partitionField),
         true);
   }
 
+  public static TestIcebergTable forGivenSchemaAndPartitioning(
+      String tableName,
+      Path tempDir,
+      Configuration hadoopConf,
+      Schema tableSchema,
+      String partitionField) {
+    return new TestIcebergTable(
+        tableName,
+        tempDir,
+        hadoopConf,
+        Optional.of(tableSchema),
+        Collections.singletonList(partitionField),
+        false);
+  }
+
   public TestIcebergTable(
       String tableName,
       Path tempDir,
       Configuration hadoopConf,
-      String recordKeyField,
+      Optional<Schema> tableSchema,
       List<String> partitionFields,
       boolean includeAdditionalColumns) {
     this.tableName = tableName;
     this.basePath = tempDir.toUri().toString();
     this.icebergDataHelper =
         createIcebergDataHelper(
-            recordKeyField, filterNullFields(partitionFields), includeAdditionalColumns);
+            tableSchema, filterNullFields(partitionFields), includeAdditionalColumns);
     this.schema = icebergDataHelper.getTableSchema();
 
     PartitionSpec partitionSpec = icebergDataHelper.getPartitionSpec();
@@ -294,7 +310,7 @@
 
   @Override
   public String getFilterQuery() {
-    return String.format("%s > 'aaa'", icebergDataHelper.getRecordKeyField());
+    return String.format("%s > 'aaa'", DEFAULT_RECORD_KEY_FIELD);
   }
 
   public Long getLastCommitTimestamp() {
diff --git a/core/src/test/java/io/onetable/TestJavaHudiTable.java b/core/src/test/java/io/onetable/TestJavaHudiTable.java
index 0130787..8ae179b 100644
--- a/core/src/test/java/io/onetable/TestJavaHudiTable.java
+++ b/core/src/test/java/io/onetable/TestJavaHudiTable.java
@@ -36,6 +36,7 @@
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.HoodieJavaWriteClient;
@@ -96,39 +97,23 @@
   }
 
   /**
-   * Create a test table instance with a schema that has more fields than an instance returned by
-   * {@link #forStandardSchema(String, Path, String, HoodieTableType)}. Specifically this instance
-   * will add a top level field, nested field, field within a list, and field within a map to ensure
-   * schema evolution is properly handled.
+   * Create a test table instance for general testing with given schema and partitioning(if
+   * enabled).
    *
-   * @param tableName name of the table used in the test, should be unique per test within a shared
-   *     directory
-   * @param tempDir directory where table will be written, typically a temporary directory that will
-   *     be cleaned up after the tests.
-   * @param partitionConfig sets the property `hoodie.datasource.write.partitionpath.field` for the
-   *     {@link CustomKeyGenerator}. If null, {@link NonpartitionedKeyGenerator} will be used.
-   * @param tableType the table type to use (MoR or CoW)
-   * @return an instance of the class with this configuration
+   * @param tableName
+   * @param tempDir
+   * @param jsc
+   * @param partitionConfig
+   * @return
    */
-  public static TestJavaHudiTable withAdditionalColumns(
-      String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) {
-    return new TestJavaHudiTable(
-        tableName,
-        addSchemaEvolutionFieldsToBase(BASIC_SCHEMA),
-        tempDir,
-        partitionConfig,
-        tableType,
-        null);
-  }
-
-  public static TestJavaHudiTable withAdditionalTopLevelField(
+  public static TestJavaHudiTable forGivenSchemaAndPartitioning(
       String tableName,
       Path tempDir,
-      String partitionConfig,
-      HoodieTableType tableType,
-      Schema previousSchema) {
-    return new TestJavaHudiTable(
-        tableName, addTopLevelField(previousSchema), tempDir, partitionConfig, tableType, null);
+      JavaSparkContext jsc,
+      Schema tableSchema,
+      String partitionConfig) {
+    return withSchema(
+        tableName, tempDir, partitionConfig, HoodieTableType.COPY_ON_WRITE, tableSchema);
   }
 
   public static TestJavaHudiTable withSchema(
diff --git a/core/src/test/java/io/onetable/TestSparkDeltaTable.java b/core/src/test/java/io/onetable/TestSparkDeltaTable.java
index 89bc9f2..793d664 100644
--- a/core/src/test/java/io/onetable/TestSparkDeltaTable.java
+++ b/core/src/test/java/io/onetable/TestSparkDeltaTable.java
@@ -29,6 +29,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import lombok.Getter;
@@ -39,6 +40,7 @@
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.StructType;
 
 import org.apache.spark.sql.delta.DeltaLog;
 
@@ -63,18 +65,31 @@
 
   public static TestSparkDeltaTable forStandardSchemaAndPartitioning(
       String tableName, Path tempDir, SparkSession sparkSession, String partitionField) {
-    return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, false);
+    return new TestSparkDeltaTable(
+        tableName, tempDir, sparkSession, Optional.empty(), partitionField, false);
   }
 
   public static TestSparkDeltaTable forSchemaWithAdditionalColumnsAndPartitioning(
       String tableName, Path tempDir, SparkSession sparkSession, String partitionField) {
-    return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, true);
+    return new TestSparkDeltaTable(
+        tableName, tempDir, sparkSession, Optional.empty(), partitionField, true);
+  }
+
+  public static TestSparkDeltaTable forGivenSchemaAndPartitioning(
+      String tableName,
+      Path tempDir,
+      SparkSession sparkSession,
+      StructType tableSchema,
+      String partitionField) {
+    return new TestSparkDeltaTable(
+        tableName, tempDir, sparkSession, Optional.of(tableSchema), partitionField, false);
   }
 
   public TestSparkDeltaTable(
       String name,
       Path tempDir,
       SparkSession sparkSession,
+      Optional<StructType> tableSchema,
       String partitionField,
       boolean includeAdditionalColumns) {
     try {
@@ -83,7 +98,8 @@
       this.sparkSession = sparkSession;
       this.partitionField = partitionField;
       this.includeAdditionalColumns = includeAdditionalColumns;
-      this.testDeltaHelper = createTestDataHelper(partitionField, includeAdditionalColumns);
+      this.testDeltaHelper =
+          createTestDataHelper(tableSchema, partitionField, includeAdditionalColumns);
       testDeltaHelper.createTable(sparkSession, tableName, basePath);
       this.deltaLog = DeltaLog.forTable(sparkSession, basePath);
       this.deltaTable = DeltaTable.forPath(sparkSession, basePath);
@@ -118,7 +134,7 @@
 
   @Override
   public String getOrderByColumn() {
-    return "id";
+    return "key";
   }
 
   @SneakyThrows
@@ -199,7 +215,8 @@
   }
 
   private String initBasePath(Path tempDir, String tableName) throws IOException {
-    Path basePath = tempDir.resolve(tableName);
+    // To decouple table name and base path.
+    Path basePath = tempDir.resolve(tableName + "_v1");
     Files.createDirectories(basePath);
     return basePath.toUri().toString();
   }
diff --git a/core/src/test/java/io/onetable/TestSparkHudiTable.java b/core/src/test/java/io/onetable/TestSparkHudiTable.java
index 205d54c..4e3e45f 100644
--- a/core/src/test/java/io/onetable/TestSparkHudiTable.java
+++ b/core/src/test/java/io/onetable/TestSparkHudiTable.java
@@ -115,34 +115,23 @@
   }
 
   /**
-   * Create a test table instance with a schema that has more fields than an instance returned by
-   * {@link #forStandardSchema(String, Path, JavaSparkContext, String, HoodieTableType)}.
-   * Specifically this instance will add a top level field, nested field, field within a list, and
-   * field within a map to ensure schema evolution is properly handled.
+   * Create a test table instance for general testing with given schema and partitioning(if
+   * enabled).
    *
-   * @param tableName name of the table used in the test, should be unique per test within a shared
-   *     directory
-   * @param tempDir directory where table will be written, typically a temporary directory that will
-   *     be cleaned up after the tests.
-   * @param jsc the {@link JavaSparkContext} to use when writing data with Hudi
-   * @param partitionConfig sets the property `hoodie.datasource.write.partitionpath.field` for the
-   *     {@link CustomKeyGenerator}. If null, {@link NonpartitionedKeyGenerator} will be used.
-   * @param tableType the table type to use (MoR or CoW)
-   * @return an instance of the class with this configuration
+   * @param tableName
+   * @param tempDir
+   * @param jsc
+   * @param partitionConfig
+   * @return
    */
-  public static TestSparkHudiTable withAdditionalColumns(
+  public static TestSparkHudiTable forGivenSchemaAndPartitioning(
       String tableName,
       Path tempDir,
       JavaSparkContext jsc,
-      String partitionConfig,
-      HoodieTableType tableType) {
+      Schema tableSchema,
+      String partitionConfig) {
     return new TestSparkHudiTable(
-        tableName,
-        addSchemaEvolutionFieldsToBase(BASIC_SCHEMA),
-        tempDir,
-        jsc,
-        partitionConfig,
-        tableType);
+        tableName, tableSchema, tempDir, jsc, partitionConfig, HoodieTableType.COPY_ON_WRITE);
   }
 
   private TestSparkHudiTable(
diff --git a/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java b/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java
index c50ddd6..149468c 100644
--- a/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java
+++ b/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java
@@ -35,6 +35,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -323,7 +324,12 @@
     String tableName = getTableName();
     TestSparkDeltaTable testSparkDeltaTable =
         new TestSparkDeltaTable(
-            tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false);
+            tableName,
+            tempDir,
+            sparkSession,
+            Optional.empty(),
+            isPartitioned ? "yearOfBirth" : null,
+            false);
     List<List<String>> allActiveFiles = new ArrayList<>();
     List<TableChange> allTableChanges = new ArrayList<>();
     List<Row> rows = testSparkDeltaTable.insertRows(50);
@@ -378,7 +384,12 @@
     String tableName = getTableName();
     TestSparkDeltaTable testSparkDeltaTable =
         new TestSparkDeltaTable(
-            tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false);
+            tableName,
+            tempDir,
+            sparkSession,
+            Optional.empty(),
+            isPartitioned ? "yearOfBirth" : null,
+            false);
     // Insert 50 rows to 2018 partition.
     List<Row> commit1Rows = testSparkDeltaTable.insertRowsForPartition(50, 2018);
     Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
@@ -428,7 +439,12 @@
     String tableName = getTableName();
     TestSparkDeltaTable testSparkDeltaTable =
         new TestSparkDeltaTable(
-            tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false);
+            tableName,
+            tempDir,
+            sparkSession,
+            Optional.empty(),
+            isPartitioned ? "yearOfBirth" : null,
+            false);
     List<List<String>> allActiveFiles = new ArrayList<>();
     List<TableChange> allTableChanges = new ArrayList<>();
     List<Row> rows = testSparkDeltaTable.insertRows(50);
@@ -482,7 +498,12 @@
     String tableName = getTableName();
     TestSparkDeltaTable testSparkDeltaTable =
         new TestSparkDeltaTable(
-            tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, true);
+            tableName,
+            tempDir,
+            sparkSession,
+            Optional.empty(),
+            isPartitioned ? "yearOfBirth" : null,
+            true);
     List<List<String>> allActiveFiles = new ArrayList<>();
     List<TableChange> allTableChanges = new ArrayList<>();
     List<Row> rows = testSparkDeltaTable.insertRows(50);
@@ -526,7 +547,8 @@
   public void testDropPartition() {
     String tableName = getTableName();
     TestSparkDeltaTable testSparkDeltaTable =
-        new TestSparkDeltaTable(tableName, tempDir, sparkSession, "yearOfBirth", false);
+        new TestSparkDeltaTable(
+            tableName, tempDir, sparkSession, Optional.empty(), "yearOfBirth", false);
     List<List<String>> allActiveFiles = new ArrayList<>();
     List<TableChange> allTableChanges = new ArrayList<>();
 
@@ -583,7 +605,12 @@
     String tableName = getTableName();
     TestSparkDeltaTable testSparkDeltaTable =
         new TestSparkDeltaTable(
-            tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false);
+            tableName,
+            tempDir,
+            sparkSession,
+            Optional.empty(),
+            isPartitioned ? "yearOfBirth" : null,
+            false);
     List<List<String>> allActiveFiles = new ArrayList<>();
     List<TableChange> allTableChanges = new ArrayList<>();
     List<Row> rows = testSparkDeltaTable.insertRows(50);
diff --git a/core/src/test/java/io/onetable/delta/TestDeltaHelper.java b/core/src/test/java/io/onetable/delta/TestDeltaHelper.java
index a1d1ad1..934552a 100644
--- a/core/src/test/java/io/onetable/delta/TestDeltaHelper.java
+++ b/core/src/test/java/io/onetable/delta/TestDeltaHelper.java
@@ -39,6 +39,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
@@ -76,7 +77,7 @@
 
   private static final StructField[] COMMON_FIELDS =
       new StructField[] {
-        new StructField("id", IntegerType, false, Metadata.empty()),
+        new StructField("key", StringType, false, Metadata.empty()),
         new StructField("firstName", StringType, true, Metadata.empty()),
         new StructField("lastName", StringType, true, Metadata.empty()),
         new StructField("gender", StringType, true, Metadata.empty()),
@@ -108,7 +109,13 @@
   private static final StructField[] ADDITIONAL_FIELDS =
       new StructField[] {new StructField("street", StringType, true, Metadata.empty())};
   private static final StructField[] DATE_PARTITIONED_FIELDS =
-      new StructField[] {new StructField("yearOfBirth", IntegerType, true, Metadata.empty())};
+      new StructField[] {
+        new StructField(
+            "yearOfBirth",
+            IntegerType,
+            true,
+            Metadata.fromJson("{\"delta.generationExpression\": \"YEAR(birthDate)\"}"))
+      };
 
   private static final Random RANDOM = new Random();
   private static final String[] GENDERS = {"Male", "Female"};
@@ -118,8 +125,11 @@
   boolean includeAdditionalColumns;
 
   public static TestDeltaHelper createTestDataHelper(
-      String partitionField, boolean includeAdditionalColumns) {
-    StructType tableSchema = generateDynamicSchema(partitionField, includeAdditionalColumns);
+      Optional<StructType> tblInputSchema,
+      String partitionField,
+      boolean includeAdditionalColumns) {
+    StructType tableSchema =
+        tblInputSchema.orElse(generateDynamicSchema(partitionField, includeAdditionalColumns));
     return TestDeltaHelper.builder()
         .tableStructSchema(tableSchema)
         .partitionField(partitionField)
@@ -142,26 +152,31 @@
   public void createTable(SparkSession sparkSession, String tableName, String basePath) {
     DeltaTableBuilder tableBuilder =
         DeltaTable.createIfNotExists(sparkSession).tableName(tableName).location(basePath);
-    Arrays.stream(COMMON_FIELDS).forEach(tableBuilder::addColumn);
-    if ("yearOfBirth".equals(partitionField)) {
-      tableBuilder
-          .addColumn(
-              DeltaTable.columnBuilder("yearOfBirth")
-                  .dataType(IntegerType)
-                  .generatedAlwaysAs("YEAR(birthDate)")
-                  .build())
-          .partitionedBy("yearOfBirth");
-    } else if ("level".equals(partitionField)) {
-      tableBuilder.partitionedBy(partitionField);
+    for (StructField sf : tableStructSchema.fields()) {
+      tableBuilder = addFieldToTableBuilder(tableBuilder, sf);
+    }
+    if ("yearOfBirth".equals(partitionField) || "level".equals(partitionField)) {
+      tableBuilder = tableBuilder.partitionedBy(partitionField);
     } else if (partitionField != null) {
       throw new IllegalArgumentException("Unexpected partition field: " + partitionField);
     }
-    if (includeAdditionalColumns) {
-      tableBuilder.addColumn("street", StringType);
-    }
     tableBuilder.execute();
   }
 
+  private DeltaTableBuilder addFieldToTableBuilder(DeltaTableBuilder tableBuilder, StructField sf) {
+    if (sf.metadata().contains("delta.generationExpression")) {
+      String generatedExpression = sf.metadata().getString("delta.generationExpression");
+      if (generatedExpression != null) {
+        return tableBuilder.addColumn(
+            DeltaTable.columnBuilder(sf.name())
+                .dataType(sf.dataType())
+                .generatedAlwaysAs(generatedExpression)
+                .build());
+      }
+    }
+    return tableBuilder.addColumn(sf.name(), sf.dataType());
+  }
+
   public Row generateRandomRow() {
     int year = 2013 + RANDOM.nextInt(11);
     String levelValue = LEVEL_VALUES.get(RANDOM.nextInt(LEVEL_VALUES.size()));
@@ -186,8 +201,8 @@
 
   private Object generateValueForField(StructField field, int yearValue, String levelValue) {
     switch (field.name()) {
-      case "id":
-        return ID_GENERATOR.incrementAndGet();
+      case "key":
+        return generateRandomString();
       case "gender":
         return GENDERS[RANDOM.nextInt(GENDERS.length)];
       case "birthDate":
diff --git a/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java b/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java
index 8e9da45..5379e3c 100644
--- a/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java
+++ b/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java
@@ -47,7 +47,7 @@
   private static final Map<String, StructField> STRUCT_FIELD_MAP =
       new HashMap<String, StructField>() {
         {
-          put("id", DataTypes.createStructField("id", DataTypes.IntegerType, false));
+          put("key", DataTypes.createStructField("key", DataTypes.IntegerType, false));
           put("firstName", DataTypes.createStructField("firstName", DataTypes.StringType, false));
           put("gender", DataTypes.createStructField("gender", DataTypes.StringType, false));
           put(
@@ -115,7 +115,7 @@
   @Test
   public void testUnpartitionedTable() {
     StructType tableSchema =
-        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate"));
+        getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate"));
     OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
     List<OnePartitionField> onePartitionFields =
         deltaPartitionExtractor.convertFromDeltaPartitionFormat(oneSchema, new StructType());
@@ -125,7 +125,7 @@
   @Test
   public void testSimplePartitionedTable() {
     StructType tableSchema =
-        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate"));
+        getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate"));
     StructType partitionSchema = getSchemaWithFields(Arrays.asList("gender"));
     OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
     List<OnePartitionField> expectedOnePartitionFields =
@@ -146,7 +146,8 @@
   @Test
   public void testDatePartitionedGeneratedColumnsTable() {
     StructType tableSchema =
-        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "dateOfBirth"));
+        getSchemaWithFields(
+            Arrays.asList("key", "firstName", "gender", "birthDate", "dateOfBirth"));
     StructType partitionSchema = getSchemaWithFields(Arrays.asList("dateOfBirth"));
     OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
     List<OnePartitionField> expectedOnePartitionFields =
@@ -164,7 +165,7 @@
   @Test
   public void testDateFormatPartitionedGeneratedColumnsTable() {
     StructType tableSchema =
-        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "dateFmt"));
+        getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate", "dateFmt"));
     StructType partitionSchema = getSchemaWithFields(Arrays.asList("dateFmt"));
     OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
     List<OnePartitionField> expectedOnePartitionFields =
@@ -182,7 +183,8 @@
   @Test
   public void yearPartitionedGeneratedColumnsTable() {
     StructType tableSchema =
-        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "yearOfBirth"));
+        getSchemaWithFields(
+            Arrays.asList("key", "firstName", "gender", "birthDate", "yearOfBirth"));
     StructType partitionSchema = getSchemaWithFields(Arrays.asList("yearOfBirth"));
     OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
     List<OnePartitionField> expectedOnePartitionFields =
@@ -200,8 +202,9 @@
   @Test
   public void yearAndSimpleCombinedPartitionedGeneratedColumnsTable() {
     StructType tableSchema =
-        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "yearOfBirth"));
-    StructType partitionSchema = getSchemaWithFields(Arrays.asList("yearOfBirth", "id"));
+        getSchemaWithFields(
+            Arrays.asList("key", "firstName", "gender", "birthDate", "yearOfBirth"));
+    StructType partitionSchema = getSchemaWithFields(Arrays.asList("yearOfBirth", "key"));
     OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
     List<OnePartitionField> expectedOnePartitionFields =
         Arrays.asList(
@@ -213,7 +216,7 @@
             OnePartitionField.builder()
                 .sourceField(
                     OneField.builder()
-                        .name("id")
+                        .name("key")
                         .schema(OneSchema.builder().name("integer").dataType(OneType.INT).build())
                         .build())
                 .transformType(PartitionTransformType.VALUE)
@@ -228,7 +231,7 @@
     StructType tableSchema =
         getSchemaWithFields(
             Arrays.asList(
-                "id",
+                "key",
                 "firstName",
                 "gender",
                 "birthDate",
@@ -257,16 +260,16 @@
   @Test
   public void testCombinationOfPlainAndGeneratedColumns() {
     StructType tableSchema =
-        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "dateFmt"));
+        getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate", "dateFmt"));
     StructType partitionSchema =
-        getSchemaWithFields(Arrays.asList("id", "dateFmt", "gender", "dateOfBirth"));
+        getSchemaWithFields(Arrays.asList("key", "dateFmt", "gender", "dateOfBirth"));
     OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
     List<OnePartitionField> expectedOnePartitionFields =
         Arrays.asList(
             OnePartitionField.builder()
                 .sourceField(
                     OneField.builder()
-                        .name("id")
+                        .name("key")
                         .schema(OneSchema.builder().name("integer").dataType(OneType.INT).build())
                         .build())
                 .transformType(PartitionTransformType.VALUE)
diff --git a/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java b/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java
index c2fd739..84c6afe 100644
--- a/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java
+++ b/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java
@@ -99,7 +99,7 @@
   private static final HoodieEngineContext CONTEXT = new HoodieJavaEngineContext(CONFIGURATION);
 
   private static final String TABLE_NAME = "test_table";
-  private static final String KEY_FIELD_NAME = "id";
+  private static final String KEY_FIELD_NAME = "key";
   private static final String PARTITION_FIELD_NAME = "partition_field";
   private static final String OTHER_FIELD_NAME = "content";
   private static final long FILE_SIZE = 100L;
diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java b/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java
index f2ad0c6..174a250 100644
--- a/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java
+++ b/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java
@@ -33,6 +33,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -55,10 +56,12 @@
 @Builder
 @Value
 public class TestIcebergDataHelper {
+  public static final String DEFAULT_RECORD_KEY_FIELD = "key";
+
   private static final Random RANDOM = new Random();
   private static final List<Types.NestedField> COMMON_FIELDS =
       Arrays.asList(
-          NestedField.optional(1, "id", Types.StringType.get()),
+          NestedField.optional(1, "key", Types.StringType.get()),
           NestedField.optional(2, "ts", Types.LongType.get()),
           NestedField.optional(3, "level", Types.StringType.get()),
           NestedField.optional(4, "severity", Types.IntegerType.get()),
@@ -114,15 +117,14 @@
   private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
 
   Schema tableSchema;
-  String recordKeyField;
   List<String> partitionFieldNames;
 
   public static TestIcebergDataHelper createIcebergDataHelper(
-      String recordKeyField, List<String> partitionFields, boolean includeAdditionalColumns) {
-    Schema tableSchema = getSchema(includeAdditionalColumns);
+      Optional<Schema> tblSchema, List<String> partitionFields, boolean includeAdditionalColumns) {
+
+    Schema schema = tblSchema.orElse(getSchema(includeAdditionalColumns));
     return TestIcebergDataHelper.builder()
-        .tableSchema(tableSchema)
-        .recordKeyField(recordKeyField)
+        .tableSchema(schema)
         .partitionFieldNames(partitionFields)
         .build();
   }
@@ -193,7 +195,7 @@
       String fieldName = field.name();
       Object value;
 
-      if (fieldName.equals(recordKeyField)
+      if (fieldName.equals(DEFAULT_RECORD_KEY_FIELD)
           || (partitionFieldNames != null && partitionFieldNames.contains(fieldName))) {
         value = existingRecord.getField(fieldName);
       } else {
@@ -237,7 +239,7 @@
     Type fieldType = field.type();
     if (partitionValue != null && partitionFieldNames.contains(fieldName)) {
       return partitionValue;
-    } else if (fieldName.equals(recordKeyField)) {
+    } else if (fieldName.equals(DEFAULT_RECORD_KEY_FIELD)) {
       return keyValue;
     } else if (fieldName.equals("ts")) {
       return System.currentTimeMillis();
diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java b/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java
index 7a3c884..2499f07 100644
--- a/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java
+++ b/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java
@@ -45,12 +45,12 @@
       IcebergPartitionValueConverter.getInstance();
   private static final Schema SCHEMA =
       new Schema(
-          Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+          Types.NestedField.optional(1, "key", Types.IntegerType.get()),
           Types.NestedField.optional(2, "name", Types.StringType.get()),
           Types.NestedField.optional(3, "birthDate", Types.TimestampType.withZone()));
   private static final Schema SCHEMA_WITH_PARTITION =
       new Schema(
-          Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+          Types.NestedField.optional(1, "key", Types.IntegerType.get()),
           Types.NestedField.optional(2, "name", Types.StringType.get()),
           Types.NestedField.optional(3, "birthDate", Types.TimestampType.withZone()),
           Types.NestedField.optional(4, "birthDate_year", Types.IntegerType.get()));
diff --git a/core/src/test/resources/schemas/common_schema.avsc b/core/src/test/resources/schemas/common_schema.avsc
new file mode 100644
index 0000000..8659f3f
--- /dev/null
+++ b/core/src/test/resources/schemas/common_schema.avsc
@@ -0,0 +1,62 @@
+{
+  "type": "record",
+  "name": "CommonSchema",
+  "namespace": "test",
+  "fields": [
+    {
+      "name": "key",
+      "type": "string"
+    },
+    {
+      "name": "ts",
+      "type": "long"
+    },
+    {
+      "name": "level",
+      "type": "string"
+    },
+    {
+      "name": "severity",
+      "type": ["null", "int"],
+      "default": null
+    },
+    {
+      "name": "double_field",
+      "type": "double",
+      "default": 0.0
+    },
+    {
+      "name": "float_field",
+      "type": "float",
+      "default": 0.0
+    },
+    {
+      "name": "int_field",
+      "type": "int",
+      "default": 0
+    },
+    {
+      "name": "long_field",
+      "type": "long",
+      "default": 0
+    },
+    {
+      "name": "boolean_field",
+      "type": "boolean",
+      "default": false
+    },
+    {
+      "name": "string_field",
+      "type": {
+        "type": "string",
+        "avro.java.string": "String"
+      },
+      "default": ""
+    },
+    {
+      "name": "bytes_field",
+      "type": "bytes",
+      "default": ""
+    }
+  ]
+}
\ No newline at end of file