Avoid unnecessary Iceberg datafile to onedatafile conversions (#330)

This is a performance optimization change and extends the improvements added to DeltaClient to IcebergClient.

The current code in the Iceberg client generates unnecessary objects when computing the file diff to find new and removed files. The process first converts all table format data files of the current snapshot to OneDataFiles, uses OneDataFiles to compute the diff, and then converts the resulting OneDataFiles collection back to table format data file objects for writing. There is an unnecessary round trip here. For large tables with thousands of data files in a snapshot, this results in the creation of a large number of objects unnecessarily.

This change optimizes this process by skipping the unnecessary conversions. This optimization does not change the behavior of the translation. This change does not break backward compatibility and is already covered by existing tests.
diff --git a/api/src/main/java/io/onetable/model/storage/DataFilesDiff.java b/api/src/main/java/io/onetable/model/storage/DataFilesDiff.java
new file mode 100644
index 0000000..efcf45b
--- /dev/null
+++ b/api/src/main/java/io/onetable/model/storage/DataFilesDiff.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package io.onetable.model.storage;
+
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import lombok.Data;
+import lombok.Singular;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Holds the collection of files that represent the difference between two states/commits/snapshots
+ * of a table with respect to the data files. Between any two states of a table, the newer/latest
+ * state may contain new files not present in the older state and may have removed files that were
+ * present in the older state. In most cases the data files included in the newer state are derived
+ * from a new commit in a source table format that has not been applied to a target table format
+ * yet. Hence, the collection of data files in the newer state are typically {@link OneDataFile}s,
+ * whereas the files in the older state are represented using a generic type P which can be a data
+ * file type in specific to the target table format.
+ *
+ * @param <L> the type of the files in the latest state
+ * @param <P> the type of the files in the target table format
+ */
+@Data
+@SuperBuilder
+public class DataFilesDiff<L, P> {
+  @Singular("fileAdded")
+  private Set<L> filesAdded;
+
+  @Singular("fileRemoved")
+  private Set<P> filesRemoved;
+
+  /**
+   * Compares the latest files with the previous files and identifies the files that are new, i.e.
+   * are present in latest files buy not present in the previously known files, and the files that
+   * are removed, i.e. present in the previously known files but not present in the latest files.
+   *
+   * @param latestFiles a map of file path and file object representing files in the latest snapshot
+   *     of a table
+   * @param previousFiles a map of file path and file object representing files in a previously
+   *     synced snapshot of a table.
+   * @param <P> the type of the previous files
+   * @return the diff of the files
+   */
+  public static <L, P> DataFilesDiff<L, P> findNewAndRemovedFiles(
+      Map<String, L> latestFiles, Map<String, P> previousFiles) {
+    Set<L> newFiles = new HashSet<>();
+    Map<String, P> removedFiles = new HashMap<>(previousFiles);
+
+    // if a file in latest files is also present in previous files, then it is neither new nor
+    // removed.
+    latestFiles.forEach(
+        (key, value) -> {
+          boolean notAKnownFile = removedFiles.remove(key) == null;
+          if (notAKnownFile) {
+            newFiles.add(value);
+          }
+        });
+    return DataFilesDiff.<L, P>builder()
+        .filesAdded(newFiles)
+        .filesRemoved(removedFiles.values())
+        .build();
+  }
+
+  /**
+   * This method wraps the {@link #findNewAndRemovedFiles(Map, Map)} method, to compare the latest
+   * file groups with the previous files and identifies the files that are new, i.e. are present in
+   * latest files buy not present in the previously known files, and the files that are removed,
+   * i.e. present in the previously known files but not present in the latest files.
+   *
+   * @param latestFileGroups a list of file groups representing the latest snapshot of a table
+   * @param previousFiles a map of file path and file object representing files in a previously
+   *     synced snapshot of a table
+   * @param <P> the type of the previous files
+   * @return the set of files that are added
+   */
+  public static <P> DataFilesDiff<OneDataFile, P> findNewAndRemovedFiles(
+      List<OneFileGroup> latestFileGroups, Map<String, P> previousFiles) {
+    Map<String, OneDataFile> latestFiles =
+        latestFileGroups.stream()
+            .flatMap(group -> group.getFiles().stream())
+            .collect(Collectors.toMap(OneDataFile::getPhysicalPath, Function.identity()));
+    return findNewAndRemovedFiles(latestFiles, previousFiles);
+  }
+}
diff --git a/api/src/main/java/io/onetable/model/storage/OneDataFilesDiff.java b/api/src/main/java/io/onetable/model/storage/OneDataFilesDiff.java
index dc516d9..394a690 100644
--- a/api/src/main/java/io/onetable/model/storage/OneDataFilesDiff.java
+++ b/api/src/main/java/io/onetable/model/storage/OneDataFilesDiff.java
@@ -18,27 +18,20 @@
  
 package io.onetable.model.storage;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import lombok.Builder;
-import lombok.Singular;
+import lombok.EqualsAndHashCode;
 import lombok.Value;
+import lombok.experimental.SuperBuilder;
 
 /** Container for holding the list of files added and files removed between source and target. */
 @Value
-@Builder
-public class OneDataFilesDiff {
-  @Singular("fileAdded")
-  Set<OneDataFile> filesAdded;
-
-  @Singular("fileRemoved")
-  Set<OneDataFile> filesRemoved;
+@EqualsAndHashCode(callSuper = true)
+@SuperBuilder
+public class OneDataFilesDiff extends DataFilesDiff<OneDataFile, OneDataFile> {
 
   /**
    * Creates a OneDataFilesDiff from the list of files in the target table and the list of files in
@@ -52,19 +45,14 @@
     Map<String, OneDataFile> targetPaths =
         target.stream()
             .collect(Collectors.toMap(OneDataFile::getPhysicalPath, Function.identity()));
-    // Any files in the source that are not in the target are added
-    Set<OneDataFile> addedFiles =
+    Map<String, OneDataFile> sourcePaths =
         source.stream()
-            .map(
-                file -> {
-                  OneDataFile targetFileIfPresent = targetPaths.remove(file.getPhysicalPath());
-                  return targetFileIfPresent == null ? file : null;
-                })
-            .filter(Objects::nonNull)
-            .collect(Collectors.toSet());
-    // Any files remaining in the targetPaths map are not present in the source and should be marked
-    // for removal
-    Set<OneDataFile> removedFiles = new HashSet<>(targetPaths.values());
-    return OneDataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
+            .collect(Collectors.toMap(OneDataFile::getPhysicalPath, Function.identity()));
+
+    DataFilesDiff<OneDataFile, OneDataFile> diff = findNewAndRemovedFiles(sourcePaths, targetPaths);
+    return OneDataFilesDiff.builder()
+        .filesAdded(diff.getFilesAdded())
+        .filesRemoved(diff.getFilesRemoved())
+        .build();
   }
 }
diff --git a/api/src/test/java/io/onetable/model/storage/TestDataFilesDiff.java b/api/src/test/java/io/onetable/model/storage/TestDataFilesDiff.java
new file mode 100644
index 0000000..83f8ae7
--- /dev/null
+++ b/api/src/test/java/io/onetable/model/storage/TestDataFilesDiff.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package io.onetable.model.storage;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+public class TestDataFilesDiff {
+  @Test
+  void findDiffFromFileGroups() {
+    OneDataFile file1Group1 = OneDataFile.builder().physicalPath("file1Group1").build();
+    OneDataFile file2Group1 = OneDataFile.builder().physicalPath("file2Group1").build();
+    OneDataFile file1Group2 = OneDataFile.builder().physicalPath("file1Group2").build();
+    OneDataFile file2Group2 = OneDataFile.builder().physicalPath("file2Group2").build();
+
+    List<OneFileGroup> latestFileGroups =
+        OneFileGroup.fromFiles(Arrays.asList(file1Group1, file2Group1, file1Group2, file2Group2));
+
+    Map<String, File> previousFiles = new HashMap<>();
+    File file1 = mock(File.class);
+    File file2 = mock(File.class);
+    File file3 = mock(File.class);
+    previousFiles.put("file1Group1", file1);
+    previousFiles.put("file2NoGroup", file2);
+    previousFiles.put("file2Group2", file3);
+
+    DataFilesDiff<OneDataFile, File> diff =
+        DataFilesDiff.findNewAndRemovedFiles(latestFileGroups, previousFiles);
+    assertEquals(2, diff.getFilesAdded().size());
+    assertTrue(diff.getFilesAdded().contains(file1Group2));
+    assertTrue(diff.getFilesAdded().contains(file2Group1));
+    assertEquals(1, diff.getFilesRemoved().size());
+    assertTrue(diff.getFilesRemoved().contains(file2));
+  }
+
+  @Test
+  void findDiffFromFilesNoPrevious() {
+    File file1 = mock(File.class);
+    File file2 = mock(File.class);
+
+    Map<String, File> previousFiles = new HashMap<>();
+    Map<String, File> latestFiles = new HashMap<>();
+    latestFiles.put("file1", file1);
+    latestFiles.put("file2", file2);
+
+    DataFilesDiff<File, File> diff =
+        DataFilesDiff.findNewAndRemovedFiles(latestFiles, previousFiles);
+    assertEquals(0, diff.getFilesRemoved().size());
+    assertEquals(2, diff.getFilesAdded().size());
+    assertTrue(diff.getFilesAdded().contains(file1));
+    assertTrue(diff.getFilesAdded().contains(file2));
+  }
+
+  @Test
+  void findDiffFromFilesNoNew() {
+    File file1 = mock(File.class);
+    File file2 = mock(File.class);
+
+    Map<String, File> previousFiles = new HashMap<>();
+    previousFiles.put("file1", file1);
+    previousFiles.put("file2", file2);
+
+    Map<String, File> latestFiles = new HashMap<>();
+    latestFiles.put("file1", file1);
+    latestFiles.put("file2", file2);
+
+    DataFilesDiff<File, File> diff =
+        DataFilesDiff.findNewAndRemovedFiles(latestFiles, previousFiles);
+    assertEquals(0, diff.getFilesRemoved().size());
+    assertEquals(0, diff.getFilesAdded().size());
+  }
+
+  @Test
+  void findDiffFromFiles() {
+    File file1 = mock(File.class);
+    File file2 = mock(File.class);
+    File file3 = mock(File.class);
+
+    Map<String, File> previousFiles = new HashMap<>();
+    previousFiles.put("file1", file1);
+    previousFiles.put("file2", file2);
+
+    Map<String, File> latestFiles = new HashMap<>();
+    latestFiles.put("file2", file2);
+    latestFiles.put("file3", file3);
+
+    DataFilesDiff<File, File> diff =
+        DataFilesDiff.findNewAndRemovedFiles(latestFiles, previousFiles);
+    assertEquals(1, diff.getFilesAdded().size());
+    assertTrue(diff.getFilesAdded().contains(file3));
+    assertEquals(1, diff.getFilesRemoved().size());
+    assertTrue(diff.getFilesRemoved().contains(file1));
+  }
+}
diff --git a/core/src/main/java/io/onetable/delta/DeltaDataFileUpdatesExtractor.java b/core/src/main/java/io/onetable/delta/DeltaDataFileUpdatesExtractor.java
index 276f320..e89e31b 100644
--- a/core/src/main/java/io/onetable/delta/DeltaDataFileUpdatesExtractor.java
+++ b/core/src/main/java/io/onetable/delta/DeltaDataFileUpdatesExtractor.java
@@ -39,6 +39,7 @@
 
 import io.onetable.model.schema.OneSchema;
 import io.onetable.model.stat.ColumnStat;
+import io.onetable.model.storage.DataFilesDiff;
 import io.onetable.model.storage.OneDataFile;
 import io.onetable.model.storage.OneDataFilesDiff;
 import io.onetable.model.storage.OneFileGroup;
@@ -63,7 +64,7 @@
     // all files in the current delta snapshot are potential candidates for remove actions, i.e. if
     // the file is not present in the new snapshot (addedFiles) then the file is considered removed
     Snapshot snapshot = deltaLog.snapshot();
-    Map<String, Action> removedFiles =
+    Map<String, Action> previousFiles =
         snapshot.allFiles().collectAsList().stream()
             .map(AddFile::remove)
             .collect(
@@ -71,19 +72,11 @@
                     file -> DeltaActionsConverter.getFullPathToFile(snapshot, file.path()),
                     file -> file));
 
-    Set<OneDataFile> addedFiles =
-        partitionedDataFiles.stream()
-            .flatMap(group -> group.getFiles().stream())
-            .map(
-                file -> {
-                  Action targetFileIfPresent = removedFiles.remove(file.getPhysicalPath());
-                  return targetFileIfPresent == null ? file : null;
-                })
-            .filter(Objects::nonNull)
-            .collect(Collectors.toSet());
+    DataFilesDiff<OneDataFile, Action> diff =
+        OneDataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles);
 
     return applyDiff(
-        addedFiles, removedFiles.values(), tableSchema, deltaLog.dataPath().toString());
+        diff.getFilesAdded(), diff.getFilesRemoved(), tableSchema, deltaLog.dataPath().toString());
   }
 
   public Seq<Action> applyDiff(
diff --git a/core/src/main/java/io/onetable/iceberg/IcebergDataFileExtractor.java b/core/src/main/java/io/onetable/iceberg/IcebergDataFileExtractor.java
index 825424e..21c28b9 100644
--- a/core/src/main/java/io/onetable/iceberg/IcebergDataFileExtractor.java
+++ b/core/src/main/java/io/onetable/iceberg/IcebergDataFileExtractor.java
@@ -19,24 +19,18 @@
 package io.onetable.iceberg;
 
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 
 import lombok.Builder;
 
-import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.io.CloseableIterator;
 
 import io.onetable.exception.NotSupportedException;
-import io.onetable.model.OneTable;
 import io.onetable.model.schema.OneSchema;
 import io.onetable.model.stat.ColumnStat;
 import io.onetable.model.stat.PartitionValue;
 import io.onetable.model.storage.FileFormat;
 import io.onetable.model.storage.OneDataFile;
-import io.onetable.spi.extractor.DataFileIterator;
 
 /** Extractor of data files for Iceberg */
 @Builder
@@ -47,87 +41,6 @@
       IcebergPartitionValueConverter.getInstance();
 
   /**
-   * Initializes an iterator for Iceberg files.
-   *
-   * @return Iceberg table file iterator
-   */
-  public DataFileIterator iterator(Table iceTable, OneTable oneTable) {
-    return new IcebergDataFileIterator(iceTable, oneTable);
-  }
-
-  public class IcebergDataFileIterator implements DataFileIterator {
-    private final Table iceTable;
-    private final OneTable oneTable;
-    private final CloseableIterator<CombinedScanTask> iceScan;
-    private Iterator<OneDataFile> currentScanTaskIterator;
-
-    private IcebergDataFileIterator(Table iceTable, OneTable oneTable) {
-      this.iceTable = iceTable;
-      this.oneTable = oneTable;
-      this.iceScan = iceTable.newScan().planTasks().iterator();
-      this.currentScanTaskIterator =
-          iceScan.hasNext() ? getCurrentScanTaskIterator(iceScan.next()) : null;
-    }
-
-    @Override
-    public void close() throws Exception {
-      iceScan.close();
-    }
-
-    @Override
-    public boolean hasNext() {
-      advanceScanTask();
-      return currentScanTaskIterator != null && currentScanTaskIterator.hasNext();
-    }
-
-    @Override
-    public OneDataFile next() {
-      if (currentScanTaskIterator == null) {
-        throw new IllegalStateException("Iterator is not initialized");
-      }
-      advanceScanTask();
-      return currentScanTaskIterator.next();
-    }
-
-    private void advanceScanTask() {
-      if (currentScanTaskIterator != null && currentScanTaskIterator.hasNext()) {
-        return;
-      }
-      if (iceScan.hasNext()) {
-        currentScanTaskIterator = getCurrentScanTaskIterator(iceScan.next());
-      } else {
-        currentScanTaskIterator = null;
-      }
-    }
-
-    private Iterator<OneDataFile> getCurrentScanTaskIterator(CombinedScanTask scanTask) {
-      return scanTask.files().stream()
-          .map(
-              fileScanTask -> {
-                DataFile dataFile = fileScanTask.file();
-                List<PartitionValue> partitionValues =
-                    partitionValueConverter.toOneTable(
-                        oneTable, dataFile.partition(), iceTable.spec());
-                return fromIcebergWithoutColumnStats(dataFile, partitionValues);
-              })
-          .iterator();
-    }
-  }
-
-  /**
-   * Builds {@link OneDataFile} representation from Iceberg {@link DataFile} without any column
-   * statistics set. This can be used to reduce memory overhead when statistics are not required.
-   *
-   * @param dataFile Iceberg data file
-   * @param partitionValues representation of partition fields and ranges
-   * @return corresponding OneTable data file
-   */
-  OneDataFile fromIcebergWithoutColumnStats(
-      DataFile dataFile, List<PartitionValue> partitionValues) {
-    return fromIceberg(dataFile, partitionValues, null, false);
-  }
-
-  /**
    * Builds {@link OneDataFile} representation from Iceberg {@link DataFile}.
    *
    * @param dataFile Iceberg data file
diff --git a/core/src/main/java/io/onetable/iceberg/IcebergDataFileUpdatesSync.java b/core/src/main/java/io/onetable/iceberg/IcebergDataFileUpdatesSync.java
index 7dbfb16..e7cfb40 100644
--- a/core/src/main/java/io/onetable/iceberg/IcebergDataFileUpdatesSync.java
+++ b/core/src/main/java/io/onetable/iceberg/IcebergDataFileUpdatesSync.java
@@ -18,28 +18,22 @@
  
 package io.onetable.iceberg;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import lombok.AllArgsConstructor;
 
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.OverwriteFiles;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.Transaction;
+import org.apache.iceberg.*;
+import org.apache.iceberg.io.CloseableIterable;
 
 import io.onetable.exception.NotSupportedException;
 import io.onetable.exception.OneIOException;
 import io.onetable.model.OneTable;
+import io.onetable.model.storage.DataFilesDiff;
 import io.onetable.model.storage.OneDataFile;
 import io.onetable.model.storage.OneDataFilesDiff;
 import io.onetable.model.storage.OneFileGroup;
-import io.onetable.spi.extractor.DataFileIterator;
 
 @AllArgsConstructor(staticName = "of")
 public class IcebergDataFileUpdatesSync {
@@ -53,23 +47,20 @@
       List<OneFileGroup> partitionedDataFiles,
       Schema schema,
       PartitionSpec partitionSpec) {
-    List<OneDataFile> currentDataFiles = new ArrayList<>();
-    IcebergDataFileExtractor dataFileExtractor =
-        IcebergDataFileExtractor.builder().partitionValueConverter(partitionValueConverter).build();
-    try (DataFileIterator fileIterator = dataFileExtractor.iterator(table, oneTable)) {
-      fileIterator.forEachRemaining(currentDataFiles::add);
+
+    Map<String, DataFile> previousFiles = new HashMap<>();
+    try (CloseableIterable<FileScanTask> iterator = table.newScan().planFiles()) {
+      StreamSupport.stream(iterator.spliterator(), false)
+          .map(FileScanTask::file)
+          .forEach(file -> previousFiles.put(file.path().toString(), file));
     } catch (Exception e) {
       throw new OneIOException("Failed to iterate through Iceberg data files", e);
     }
 
-    // Sync the files diff
-    OneDataFilesDiff filesDiff =
-        OneDataFilesDiff.from(
-            partitionedDataFiles.stream()
-                .flatMap(group -> group.getFiles().stream())
-                .collect(Collectors.toList()),
-            currentDataFiles);
-    applyDiff(transaction, filesDiff, schema, partitionSpec);
+    DataFilesDiff<OneDataFile, DataFile> diff =
+        OneDataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles);
+
+    applyDiff(transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema, partitionSpec);
   }
 
   public void applyDiff(
@@ -77,16 +68,24 @@
       OneDataFilesDiff oneDataFilesDiff,
       Schema schema,
       PartitionSpec partitionSpec) {
+
+    Collection<DataFile> filesRemoved =
+        oneDataFilesDiff.getFilesRemoved().stream()
+            .map(file -> getDataFile(partitionSpec, schema, file))
+            .collect(Collectors.toList());
+
+    applyDiff(transaction, oneDataFilesDiff.getFilesAdded(), filesRemoved, schema, partitionSpec);
+  }
+
+  private void applyDiff(
+      Transaction transaction,
+      Collection<OneDataFile> filesAdded,
+      Collection<DataFile> filesRemoved,
+      Schema schema,
+      PartitionSpec partitionSpec) {
     OverwriteFiles overwriteFiles = transaction.newOverwrite();
-    oneDataFilesDiff
-        .getFilesAdded()
-        .forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, schema, f)));
-    oneDataFilesDiff
-        .getFilesRemoved()
-        .forEach(
-            f ->
-                overwriteFiles.deleteFile(
-                    getDataFile(transaction.table().spec(), transaction.table().schema(), f)));
+    filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, schema, f)));
+    filesRemoved.forEach(overwriteFiles::deleteFile);
     overwriteFiles.commit();
   }
 
diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergSync.java b/core/src/test/java/io/onetable/iceberg/TestIcebergSync.java
index f1467e6..e442e76 100644
--- a/core/src/test/java/io/onetable/iceberg/TestIcebergSync.java
+++ b/core/src/test/java/io/onetable/iceberg/TestIcebergSync.java
@@ -264,7 +264,7 @@
             partitionSpecArgumentCaptor.capture(),
             partitionSpecArgumentCaptor.capture(),
             transactionArgumentCaptor.capture());
-    verify(mockColumnStatsConverter, times(4)).toIceberg(any(Schema.class), anyLong(), anyList());
+    verify(mockColumnStatsConverter, times(3)).toIceberg(any(Schema.class), anyLong(), anyList());
 
     // check that the correct schema is used in calls to the mocks
     // Since we're using a mockSchemaSync we don't expect the table schema used by the partition