more changes
diff --git a/core/src/test/java/io/onetable/TestIcebergTable.java b/core/src/test/java/io/onetable/TestIcebergTable.java
index a0d6380..c7addb9 100644
--- a/core/src/test/java/io/onetable/TestIcebergTable.java
+++ b/core/src/test/java/io/onetable/TestIcebergTable.java
@@ -65,6 +65,8 @@
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.types.Types;
 
+import com.google.common.base.Preconditions;
+
 import io.onetable.iceberg.TestIcebergDataHelper;
 
 @Getter
@@ -140,11 +142,10 @@
     return records;
   }
 
-  @Override
   @SneakyThrows
-  public List<Record> insertRecordsForSpecialPartition(int numRows) {
+  public List<Record> insertRecordsForPartition(int numRows, String partitionValue) {
     List<Record> records =
-        icebergDataHelper.generateInsertRecordForPartition(numRows, SPECIAL_PARTITION_VALUE);
+        icebergDataHelper.generateInsertRecordForPartition(numRows, partitionValue);
     GenericAppenderFactory appenderFactory = new GenericAppenderFactory(icebergTable.schema());
     PartitionSpec spec = icebergTable.spec();
 
@@ -164,6 +165,12 @@
     return records;
   }
 
+  @Override
+  @SneakyThrows
+  public List<Record> insertRecordsForSpecialPartition(int numRows) {
+    return insertRecordsForPartition(numRows, SPECIAL_PARTITION_VALUE);
+  }
+
   @SneakyThrows
   @Override
   public void upsertRows(List<Record> recordsToUpdate) {
@@ -262,6 +269,17 @@
     return DEFAULT_RECORD_KEY_FIELD;
   }
 
+  public Map<String, List<Record>> groupRecordsByPartition(List<Record> records) {
+    Preconditions.checkArgument(
+        icebergDataHelper.getPartitionFieldNames().size() == 1,
+        "Only single partition field is supported for grouping records by partition");
+    Preconditions.checkArgument(
+        icebergDataHelper.getPartitionFieldNames().get(0).equals("level"),
+        "Only level partition field is supported for grouping records by partition");
+    return records.stream()
+        .collect(Collectors.groupingBy(record -> record.getField("level").toString()));
+  }
+
   @Override
   @SneakyThrows
   public void close() {
diff --git a/core/src/test/java/io/onetable/iceberg/ITIcebergSourceClient.java b/core/src/test/java/io/onetable/iceberg/ITIcebergSourceClient.java
index 08303ce..b02f337 100644
--- a/core/src/test/java/io/onetable/iceberg/ITIcebergSourceClient.java
+++ b/core/src/test/java/io/onetable/iceberg/ITIcebergSourceClient.java
@@ -28,9 +28,11 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -61,13 +63,12 @@
 
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
-  public void testIcebergSourceClient(boolean isPartitioned) {
+  public void testInsertsUpsertsAndDeletes(boolean isPartitioned) {
     String tableName = getTableName();
     try (TestIcebergTable testIcebergTable =
         TestIcebergTable.forStandardSchemaAndPartitioning(
             tableName, isPartitioned ? "level" : null, tempDir, hadoopConf)) {
       List<List<String>> allActiveFiles = new ArrayList<>();
-      List<List<String>> allBaseFilePaths = new ArrayList<>();
       List<TableChange> allTableChanges = new ArrayList<>();
 
       List<Record> records = testIcebergTable.insertRows(50);
@@ -118,6 +119,65 @@
     }
   }
 
+  @Test
+  public void testDropPartition() {
+    String tableName = getTableName();
+    try (TestIcebergTable testIcebergTable =
+        TestIcebergTable.forStandardSchemaAndPartitioning(
+            tableName, "level", tempDir, hadoopConf)) {
+      List<List<String>> allActiveFiles = new ArrayList<>();
+      List<TableChange> allTableChanges = new ArrayList<>();
+
+      List<Record> records = testIcebergTable.insertRows(50);
+      Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+      allActiveFiles.add(testIcebergTable.getAllActiveFiles());
+
+      List<Record> records1 = testIcebergTable.insertRows(50);
+      allActiveFiles.add(testIcebergTable.getAllActiveFiles());
+
+      List<Record> allRecords = new ArrayList<>();
+      allRecords.addAll(records);
+      allRecords.addAll(records1);
+
+      Map<String, List<Record>> recordsByPartition =
+          testIcebergTable.groupRecordsByPartition(allRecords);
+      String partitionValueToDelete = recordsByPartition.keySet().iterator().next();
+      testIcebergTable.deletePartition(partitionValueToDelete);
+      allActiveFiles.add(testIcebergTable.getAllActiveFiles());
+
+      // Insert few records again for the deleted partition.
+      testIcebergTable.insertRecordsForPartition(20, partitionValueToDelete);
+      allActiveFiles.add(testIcebergTable.getAllActiveFiles());
+
+      PerTableConfig tableConfig =
+          PerTableConfig.builder()
+              .tableName(testIcebergTable.getTableName())
+              .tableBasePath(testIcebergTable.getBasePath())
+              .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA))
+              .build();
+      IcebergSourceClient icebergSourceClient = clientProvider.getSourceClientInstance(tableConfig);
+      assertEquals(
+          120 - recordsByPartition.get(partitionValueToDelete).size(),
+          testIcebergTable.getNumRows());
+      OneSnapshot oneSnapshot = icebergSourceClient.getCurrentSnapshot();
+
+      validateIcebergPartitioning(oneSnapshot);
+      validateOneSnapshot(oneSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));
+      // Get changes in incremental format.
+      InstantsForIncrementalSync instantsForIncrementalSync =
+          InstantsForIncrementalSync.builder()
+              .lastSyncInstant(Instant.ofEpochMilli(timestamp1))
+              .build();
+      CommitsBacklog<Long> commitsBacklog =
+          icebergSourceClient.getCommitsBacklog(instantsForIncrementalSync);
+      for (Long version : commitsBacklog.getCommitsToProcess()) {
+        TableChange tableChange = icebergSourceClient.getTableChangeForCommit(version);
+        allTableChanges.add(tableChange);
+      }
+      validateTableChanges(allActiveFiles, allTableChanges);
+    }
+  }
+
   private void validateIcebergPartitioning(OneSnapshot oneSnapshot) {
     List<OnePartitionField> partitionFields = oneSnapshot.getTable().getPartitioningFields();
     assertEquals(1, partitionFields.size());