Core: Add WriterFactory (#2873)

diff --git a/.baseline/checkstyle/checkstyle.xml b/.baseline/checkstyle/checkstyle.xml
index 9df746f..54be02b 100644
--- a/.baseline/checkstyle/checkstyle.xml
+++ b/.baseline/checkstyle/checkstyle.xml
@@ -97,6 +97,7 @@
                 org.apache.iceberg.IsolationLevel.*,
                 org.apache.iceberg.NullOrder.*,
                 org.apache.iceberg.MetadataTableType.*,
+                org.apache.iceberg.MetadataColumns.*,
                 org.apache.iceberg.SortDirection.*,
                 org.apache.iceberg.TableProperties.*,
                 org.apache.iceberg.types.Type.*,
diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
index a8eb2eb..e1cf096 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
@@ -44,6 +44,7 @@
       Integer.MAX_VALUE - 101, "file_path", Types.StringType.get(), "Path of a file in which a deleted row is stored");
   public static final NestedField DELETE_FILE_POS = NestedField.required(
       Integer.MAX_VALUE - 102, "pos", Types.LongType.get(), "Ordinal position of a deleted row in the data file");
+  public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
 
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 4f66181..59715b0 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -321,6 +321,8 @@
 
     public <T> DataWriter<T> build() throws IOException {
       Preconditions.checkArgument(spec != null, "Cannot create data writer without spec");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null when creating data writer for partitioned spec");
 
       FileAppender<T> fileAppender = appenderBuilder.build();
       return new DataWriter<>(fileAppender, FileFormat.AVRO, location, spec, partition, keyMetadata, sortOrder);
@@ -428,6 +430,10 @@
       Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
       Preconditions.checkState(createWriterFunc != null,
           "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating equality delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
 
       meta("delete-type", "equality");
       meta("delete-field-ids", IntStream.of(equalityFieldIds)
@@ -446,6 +452,10 @@
 
     public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
       Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating position delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
 
       meta("delete-type", "position");
 
diff --git a/core/src/main/java/org/apache/iceberg/io/WriterFactory.java b/core/src/main/java/org/apache/iceberg/io/WriterFactory.java
new file mode 100644
index 0000000..e797c1e
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/WriterFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+
+/**
+ * A factory for creating data and delete writers.
+ */
+public interface WriterFactory<T> {
+
+  /**
+   * Creates a new {@link DataWriter}.
+   *
+   * @param file the output file
+   * @param spec the partition spec written data belongs to
+   * @param partition the partition written data belongs to or null if the spec is unpartitioned
+   * @return the constructed data writer
+   */
+  DataWriter<T> newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition);
+
+  /**
+   * Creates a new {@link EqualityDeleteWriter}.
+   *
+   * @param file the output file
+   * @param spec the partition spec written deletes belong to
+   * @param partition the partition written deletes belong to or null if the spec is unpartitioned
+   * @return the constructed equality delete writer
+   */
+  EqualityDeleteWriter<T> newEqualityDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition);
+
+  /**
+   * Creates a new {@link PositionDeleteWriter}.
+   *
+   * @param file the output file
+   * @param spec the partition spec written deletes belong to
+   * @param partition the partition written deletes belong to or null if the spec is unpartitioned
+   * @return the constructed position delete writer
+   */
+  PositionDeleteWriter<T> newPositionDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition);
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java
new file mode 100644
index 0000000..72c0ea5
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.WriterFactory;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+
+/**
+ * A base writer factory to be extended by query engine integrations.
+ */
+public abstract class BaseWriterFactory<T> implements WriterFactory<T> {
+  private final Table table;
+  private final FileFormat dataFileFormat;
+  private final Schema dataSchema;
+  private final SortOrder dataSortOrder;
+  private final FileFormat deleteFileFormat;
+  private final int[] equalityFieldIds;
+  private final Schema equalityDeleteRowSchema;
+  private final SortOrder equalityDeleteSortOrder;
+  private final Schema positionDeleteRowSchema;
+
+  protected BaseWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema,
+                              SortOrder dataSortOrder, FileFormat deleteFileFormat,
+                              int[] equalityFieldIds, Schema equalityDeleteRowSchema,
+                              SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema) {
+    this.table = table;
+    this.dataFileFormat = dataFileFormat;
+    this.dataSchema = dataSchema;
+    this.dataSortOrder = dataSortOrder;
+    this.deleteFileFormat = deleteFileFormat;
+    this.equalityFieldIds = equalityFieldIds;
+    this.equalityDeleteRowSchema = equalityDeleteRowSchema;
+    this.equalityDeleteSortOrder = equalityDeleteSortOrder;
+    this.positionDeleteRowSchema = positionDeleteRowSchema;
+  }
+
+  protected abstract void configureDataWrite(Avro.DataWriteBuilder builder);
+  protected abstract void configureEqualityDelete(Avro.DeleteWriteBuilder builder);
+  protected abstract void configurePositionDelete(Avro.DeleteWriteBuilder builder);
+
+  protected abstract void configureDataWrite(Parquet.DataWriteBuilder builder);
+  protected abstract void configureEqualityDelete(Parquet.DeleteWriteBuilder builder);
+  protected abstract void configurePositionDelete(Parquet.DeleteWriteBuilder builder);
+
+  // TODO: provide ways to configure ORC delete writers once we support them
+  protected abstract void configureDataWrite(ORC.DataWriteBuilder builder);
+
+  @Override
+  public DataWriter<T> newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+    OutputFile outputFile = file.encryptingOutputFile();
+    EncryptionKeyMetadata keyMetadata = file.keyMetadata();
+    Map<String, String> properties = table.properties();
+    MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
+
+    try {
+      switch (dataFileFormat) {
+        case AVRO:
+          Avro.DataWriteBuilder avroBuilder = Avro.writeData(outputFile)
+              .schema(dataSchema)
+              .setAll(properties)
+              .metricsConfig(metricsConfig)
+              .withSpec(spec)
+              .withPartition(partition)
+              .withKeyMetadata(keyMetadata)
+              .withSortOrder(dataSortOrder)
+              .overwrite();
+
+          configureDataWrite(avroBuilder);
+
+          return avroBuilder.build();
+
+        case PARQUET:
+          Parquet.DataWriteBuilder parquetBuilder = Parquet.writeData(outputFile)
+              .schema(dataSchema)
+              .setAll(properties)
+              .metricsConfig(metricsConfig)
+              .withSpec(spec)
+              .withPartition(partition)
+              .withKeyMetadata(keyMetadata)
+              .withSortOrder(dataSortOrder)
+              .overwrite();
+
+          configureDataWrite(parquetBuilder);
+
+          return parquetBuilder.build();
+
+        case ORC:
+          ORC.DataWriteBuilder orcBuilder = ORC.writeData(outputFile)
+              .schema(dataSchema)
+              .setAll(properties)
+              .metricsConfig(metricsConfig)
+              .withSpec(spec)
+              .withPartition(partition)
+              .withKeyMetadata(keyMetadata)
+              .withSortOrder(dataSortOrder)
+              .overwrite();
+
+          configureDataWrite(orcBuilder);
+
+          return orcBuilder.build();
+
+        default:
+          throw new UnsupportedOperationException("Unsupported data file format: " + dataFileFormat);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public EqualityDeleteWriter<T> newEqualityDeleteWriter(EncryptedOutputFile file, PartitionSpec spec,
+                                                         StructLike partition) {
+    OutputFile outputFile = file.encryptingOutputFile();
+    EncryptionKeyMetadata keyMetadata = file.keyMetadata();
+    Map<String, String> properties = table.properties();
+    MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
+
+    try {
+      switch (deleteFileFormat) {
+        case AVRO:
+          // TODO: support metrics configs in Avro equality delete writer
+
+          Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile)
+              .setAll(properties)
+              .rowSchema(equalityDeleteRowSchema)
+              .equalityFieldIds(equalityFieldIds)
+              .withSpec(spec)
+              .withPartition(partition)
+              .withKeyMetadata(keyMetadata)
+              .withSortOrder(equalityDeleteSortOrder)
+              .overwrite();
+
+          configureEqualityDelete(avroBuilder);
+
+          return avroBuilder.buildEqualityWriter();
+
+        case PARQUET:
+          Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile)
+              .setAll(properties)
+              .metricsConfig(metricsConfig)
+              .rowSchema(equalityDeleteRowSchema)
+              .equalityFieldIds(equalityFieldIds)
+              .withSpec(spec)
+              .withPartition(partition)
+              .withKeyMetadata(keyMetadata)
+              .withSortOrder(equalityDeleteSortOrder)
+              .overwrite();
+
+          configureEqualityDelete(parquetBuilder);
+
+          return parquetBuilder.buildEqualityWriter();
+
+        default:
+          throw new UnsupportedOperationException("Unsupported format for equality deletes: " + deleteFileFormat);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to create new equality delete writer", e);
+    }
+  }
+
+  @Override
+  public PositionDeleteWriter<T> newPositionDeleteWriter(EncryptedOutputFile file, PartitionSpec spec,
+                                                         StructLike partition) {
+    OutputFile outputFile = file.encryptingOutputFile();
+    EncryptionKeyMetadata keyMetadata = file.keyMetadata();
+    Map<String, String> properties = table.properties();
+
+    // TODO: build and pass a correct metrics config for position deletes
+
+    try {
+      switch (deleteFileFormat) {
+        case AVRO:
+          Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile)
+              .setAll(properties)
+              .rowSchema(positionDeleteRowSchema)
+              .withSpec(spec)
+              .withPartition(partition)
+              .withKeyMetadata(keyMetadata)
+              .overwrite();
+
+          configurePositionDelete(avroBuilder);
+
+          return avroBuilder.buildPositionWriter();
+
+        case PARQUET:
+          Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile)
+              .setAll(properties)
+              .rowSchema(positionDeleteRowSchema)
+              .withSpec(spec)
+              .withPartition(partition)
+              .withKeyMetadata(keyMetadata)
+              .overwrite();
+
+          configurePositionDelete(parquetBuilder);
+
+          return parquetBuilder.buildPositionWriter();
+
+        default:
+          throw new UnsupportedOperationException("Unsupported format for position deletes: " + deleteFileFormat);
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to create new position delete writer", e);
+    }
+  }
+
+  protected Schema dataSchema() {
+    return dataSchema;
+  }
+
+  protected Schema equalityDeleteRowSchema() {
+    return equalityDeleteRowSchema;
+  }
+
+  protected Schema positionDeleteRowSchema() {
+    return positionDeleteRowSchema;
+  }
+}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java
new file mode 100644
index 0000000..ec19326
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
+
+@RunWith(Parameterized.class)
+public abstract class TestWriterFactory<T> extends TableTestBase {
+  @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO, false},
+        new Object[]{FileFormat.AVRO, true},
+        new Object[]{FileFormat.PARQUET, false},
+        new Object[]{FileFormat.PARQUET, true},
+        new Object[]{FileFormat.ORC, false},
+        new Object[]{FileFormat.ORC, true}
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+
+  private final FileFormat fileFormat;
+  private final boolean partitioned;
+  private final List<T> dataRows;
+
+  private StructLike partition = null;
+  private OutputFileFactory fileFactory = null;
+
+  public TestWriterFactory(FileFormat fileFormat, boolean partitioned) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+    this.partitioned = partitioned;
+    this.dataRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "aaa"),
+        toRow(4, "aaa"),
+        toRow(5, "aaa")
+    );
+  }
+
+  protected abstract WriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+                                                       Schema equalityDeleteRowSchema, Schema positionDeleteRowSchema);
+
+  protected abstract T toRow(Integer id, String data);
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+
+    if (partitioned) {
+      this.table = create(SCHEMA, SPEC);
+      this.partition = initPartitionKey();
+    } else {
+      this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+      this.partition = null;
+    }
+
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testDataWriter() throws IOException {
+    WriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+
+    table.newRowDelta()
+        .addRows(dataFile)
+        .commit();
+
+    Assert.assertEquals("Records should match", toSet(dataRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testEqualityDeleteWriter() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    WriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+    // write a data file
+    DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+
+    // commit the written data file
+    table.newRowDelta()
+        .addRows(dataFile)
+        .commit();
+
+    // write an equality delete file
+    List<T> deletes = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(3, "bbb"),
+        toRow(5, "ccc")
+    );
+    DeleteFile deleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition);
+
+    // verify the written delete file
+    GenericRecord deleteRecord = GenericRecord.create(equalityDeleteRowSchema);
+    List<Record> expectedDeletes = ImmutableList.of(
+        deleteRecord.copy("id", 1),
+        deleteRecord.copy("id", 3),
+        deleteRecord.copy("id", 5)
+    );
+    InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString());
+    List<Record> actualDeletes = readFile(equalityDeleteRowSchema, inputDeleteFile);
+    Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes);
+
+    // commit the written delete file
+    table.newRowDelta()
+        .addDeletes(deleteFile)
+        .commit();
+
+    // verify the delete file is applied correctly
+    List<T> expectedRows = ImmutableList.of(
+        toRow(2, "aaa"),
+        toRow(4, "aaa")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+    Assume.assumeFalse("Table must start unpartitioned", partitioned);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    WriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+    // write an unpartitioned data file
+    DataFile firstDataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+    Assert.assertEquals("First data file must be unpartitioned", 0, firstDataFile.partition().size());
+
+    List<T> deletes = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "aaa"),
+        toRow(4, "aaa")
+    );
+
+    // write an unpartitioned delete file
+    DeleteFile firstDeleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition);
+    Assert.assertEquals("First delete file must be unpartitioned", 0, firstDeleteFile.partition().size());
+
+    // commit the first data and delete files
+    table.newAppend()
+        .appendFile(firstDataFile)
+        .commit();
+    table.newRowDelta()
+        .addDeletes(firstDeleteFile)
+        .commit();
+
+    // evolve the spec
+    table.updateSpec()
+        .addField("data")
+        .commit();
+
+    partition = initPartitionKey();
+
+    // write a partitioned data file
+    DataFile secondDataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+    Assert.assertEquals("Second data file must be partitioned", 1, secondDataFile.partition().size());
+
+    // write a partitioned delete file
+    DeleteFile secondDeleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition);
+    Assert.assertEquals("Second delete file must be artitioned", 1, secondDeleteFile.partition().size());
+
+    // commit the second data and delete files
+    table.newAppend()
+        .appendFile(secondDataFile)
+        .commit();
+    table.newRowDelta()
+        .addDeletes(secondDeleteFile)
+        .commit();
+
+    // verify both delete files are applied correctly
+    List<T> expectedRows = ImmutableList.of(
+        toRow(5, "aaa"),
+        toRow(5, "aaa")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testPositionDeleteWriter() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    WriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // write a data file
+    DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+
+    // write a position delete file
+    List<PositionDelete<T>> deletes = ImmutableList.of(
+        new PositionDelete<T>().set(dataFile.path(), 0L, null),
+        new PositionDelete<T>().set(dataFile.path(), 2L, null),
+        new PositionDelete<T>().set(dataFile.path(), 4L, null)
+    );
+    Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
+    DeleteFile deleteFile = result.first();
+    CharSequenceSet referencedDataFiles = result.second();
+
+    // verify the written delete file
+    GenericRecord deleteRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
+    List<Record> expectedDeletes = ImmutableList.of(
+        deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 0L),
+        deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 2L),
+        deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 4L)
+    );
+    InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString());
+    List<Record> actualDeletes = readFile(DeleteSchemaUtil.pathPosSchema(), inputDeleteFile);
+    Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes);
+
+    // commit the data and delete files
+    table.newRowDelta()
+        .addRows(dataFile)
+        .addDeletes(deleteFile)
+        .validateDataFilesExist(referencedDataFiles)
+        .validateDeletedFiles()
+        .commit();
+
+    // verify the delete file is applied correctly
+    List<T> expectedRows = ImmutableList.of(
+        toRow(2, "aaa"),
+        toRow(4, "aaa")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testPositionDeleteWriterWithRow() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    WriterFactory<T> writerFactory = newWriterFactory(table.schema(), table.schema());
+
+    // write a data file
+    DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+
+    // write a position delete file and persist the deleted row
+    List<PositionDelete<T>> deletes = ImmutableList.of(
+        new PositionDelete<T>().set(dataFile.path(), 0, dataRows.get(0))
+    );
+    Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
+    DeleteFile deleteFile = result.first();
+    CharSequenceSet referencedDataFiles = result.second();
+
+     // verify the written delete file
+    GenericRecord deletedRow = GenericRecord.create(table.schema());
+    Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(table.schema());
+    GenericRecord deleteRecord = GenericRecord.create(positionDeleteSchema);
+    Map<String, Object> deleteRecordColumns = ImmutableMap.of(
+        DELETE_FILE_PATH.name(), dataFile.path(),
+        DELETE_FILE_POS.name(), 0L,
+        DELETE_FILE_ROW_FIELD_NAME, deletedRow.copy("id", 1, "data", "aaa")
+    );
+    List<Record> expectedDeletes = ImmutableList.of(deleteRecord.copy(deleteRecordColumns));
+    InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString());
+    List<Record> actualDeletes = readFile(positionDeleteSchema, inputDeleteFile);
+    Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes);
+
+    // commit the data and delete files
+    table.newRowDelta()
+        .addRows(dataFile)
+        .addDeletes(deleteFile)
+        .validateDataFilesExist(referencedDataFiles)
+        .validateDeletedFiles()
+        .commit();
+
+    // verify the delete file is applied correctly
+    List<T> expectedRows = ImmutableList.of(
+        toRow(2, "aaa"),
+        toRow(3, "aaa"),
+        toRow(4, "aaa"),
+        toRow(5, "aaa")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  private PartitionKey initPartitionKey() {
+    Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa"));
+
+    PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+    partitionKey.partition(record);
+
+    return partitionKey;
+  }
+
+  private WriterFactory<T> newWriterFactory(Schema dataSchema) {
+    return newWriterFactory(dataSchema, null, null, null);
+  }
+
+  private WriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+                                            Schema equalityDeleteRowSchema) {
+    return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null);
+  }
+
+  private WriterFactory<T> newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) {
+    return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema);
+  }
+
+  private DataFile writeData(WriterFactory<T> writerFactory, List<T> rows,
+                             PartitionSpec spec, StructLike partitionKey) throws IOException {
+
+    EncryptedOutputFile file = newOutputFile(spec, partitionKey);
+    DataWriter<T> writer = writerFactory.newDataWriter(file, spec, partitionKey);
+
+    try (DataWriter<T> closeableWriter = writer) {
+      for (T row : rows) {
+        closeableWriter.add(row);
+      }
+    }
+
+    return writer.toDataFile();
+  }
+
+  private DeleteFile writeEqualityDeletes(WriterFactory<T> writerFactory, List<T> deletes,
+                                          PartitionSpec spec, StructLike partitionKey) throws IOException {
+
+    EncryptedOutputFile file = newOutputFile(spec, partitionKey);
+    EqualityDeleteWriter<T> writer = writerFactory.newEqualityDeleteWriter(file, spec, partitionKey);
+
+    try (EqualityDeleteWriter<T> closableWriter = writer) {
+      closableWriter.deleteAll(deletes);
+    }
+
+    return writer.toDeleteFile();
+  }
+
+  private Pair<DeleteFile, CharSequenceSet> writePositionDeletes(WriterFactory<T> writerFactory,
+                                                                 List<PositionDelete<T>> deletes,
+                                                                 PartitionSpec spec,
+                                                                 StructLike partitionKey) throws IOException {
+
+    EncryptedOutputFile file = newOutputFile(spec, partitionKey);
+    PositionDeleteWriter<T> writer = writerFactory.newPositionDeleteWriter(file, spec, partitionKey);
+
+    try (PositionDeleteWriter<T> closableWriter = writer) {
+      for (PositionDelete<T> delete : deletes) {
+        closableWriter.delete(delete.path(), delete.pos(), delete.row());
+      }
+    }
+
+    return Pair.of(writer.toDeleteFile(), writer.referencedDataFiles());
+  }
+
+  private List<Record> readFile(Schema schema, InputFile inputFile) throws IOException {
+    switch (fileFormat) {
+      case PARQUET:
+        try (CloseableIterable<Record> records = Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
+            .build()) {
+
+          return ImmutableList.copyOf(records);
+        }
+
+      case AVRO:
+        try (CloseableIterable<Record> records = Avro.read(inputFile)
+            .project(schema)
+            .createReaderFunc(DataReader::create)
+            .build()) {
+
+          return ImmutableList.copyOf(records);
+        }
+
+      default:
+        throw new UnsupportedOperationException("Unsupported read file format: " + fileFormat);
+    }
+  }
+
+  private StructLikeSet actualRowSet(String... columns) throws IOException {
+    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
+      reader.forEach(set::add);
+    }
+    return set;
+  }
+
+  private EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitionKey) {
+    return fileFactory.newOutputFile(spec, partitionKey);
+  }
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 1ab5c70..417598b 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -215,6 +215,8 @@
 
     public <T> DataWriter<T> build() {
       Preconditions.checkArgument(spec != null, "Cannot create data writer without spec");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null when creating data writer for partitioned spec");
 
       FileAppender<T> fileAppender = appenderBuilder.build();
       return new DataWriter<>(fileAppender, FileFormat.ORC, location, spec, partition, keyMetadata, sortOrder);
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index fef3209..aadddbf 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -456,6 +456,8 @@
 
     public <T> DataWriter<T> build() throws IOException {
       Preconditions.checkArgument(spec != null, "Cannot create data writer without spec");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null when creating data writer for partitioned spec");
 
       FileAppender<T> fileAppender = appenderBuilder.build();
       return new DataWriter<>(fileAppender, FileFormat.PARQUET, location, spec, partition, keyMetadata, sortOrder);
@@ -571,6 +573,10 @@
       Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
       Preconditions.checkState(createWriterFunc != null,
           "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating equality delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
 
       meta("delete-type", "equality");
       meta("delete-field-ids", IntStream.of(equalityFieldIds)
@@ -589,6 +595,10 @@
 
     public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
       Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating position delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
 
       meta("delete-type", "position");
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java
new file mode 100644
index 0000000..1ce202d
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.BaseWriterFactory;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
+
+class SparkWriterFactory extends BaseWriterFactory<InternalRow> {
+  private StructType dataSparkType;
+  private StructType equalityDeleteSparkType;
+  private StructType positionDeleteSparkType;
+
+  SparkWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, StructType dataSparkType,
+                     SortOrder dataSortOrder, FileFormat deleteFileFormat,
+                     int[] equalityFieldIds, Schema equalityDeleteRowSchema, StructType equalityDeleteSparkType,
+                     SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema,
+                     StructType positionDeleteSparkType) {
+
+    super(table, dataFileFormat, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds,
+        equalityDeleteRowSchema, equalityDeleteSortOrder, positionDeleteRowSchema);
+
+    this.dataSparkType = dataSparkType;
+    this.equalityDeleteSparkType = equalityDeleteSparkType;
+    this.positionDeleteSparkType = positionDeleteSparkType;
+  }
+
+  static Builder builderFor(Table table) {
+    return new Builder(table);
+  }
+
+  @Override
+  protected void configureDataWrite(Avro.DataWriteBuilder builder) {
+    builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
+  }
+
+  @Override
+  protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
+    builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType()));
+  }
+
+  @Override
+  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
+    boolean withRow = positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined();
+    if (withRow) {
+      // SparkAvroWriter accepts just the Spark type of the row ignoring the path and pos
+      StructField rowField = positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME);
+      StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
+      builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType));
+    }
+  }
+
+  @Override
+  protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
+    builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType));
+  }
+
+  @Override
+  protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
+    builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType));
+  }
+
+  @Override
+  protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
+    builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType));
+    builder.transformPaths(path -> UTF8String.fromString(path.toString()));
+  }
+
+  @Override
+  protected void configureDataWrite(ORC.DataWriteBuilder builder) {
+    builder.createWriterFunc(SparkOrcWriter::new);
+  }
+
+  private StructType dataSparkType() {
+    if (dataSparkType == null) {
+      Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
+      this.dataSparkType = SparkSchemaUtil.convert(dataSchema());
+    }
+
+    return dataSparkType;
+  }
+
+  private StructType equalityDeleteSparkType() {
+    if (equalityDeleteSparkType == null) {
+      Preconditions.checkNotNull(equalityDeleteRowSchema(), "Equality delete schema must not be null");
+      this.equalityDeleteSparkType = SparkSchemaUtil.convert(equalityDeleteRowSchema());
+    }
+
+    return equalityDeleteSparkType;
+  }
+
+  private StructType positionDeleteSparkType() {
+    if (positionDeleteSparkType == null) {
+      // wrap the optional row schema into the position delete schema that contains path and position
+      Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
+      this.positionDeleteSparkType = SparkSchemaUtil.convert(positionDeleteSchema);
+    }
+
+    return positionDeleteSparkType;
+  }
+
+  static class Builder {
+    private final Table table;
+    private FileFormat dataFileFormat;
+    private Schema dataSchema;
+    private StructType dataSparkType;
+    private SortOrder dataSortOrder;
+    private FileFormat deleteFileFormat;
+    private int[] equalityFieldIds;
+    private Schema equalityDeleteRowSchema;
+    private StructType equalityDeleteSparkType;
+    private SortOrder equalityDeleteSortOrder;
+    private Schema positionDeleteRowSchema;
+    private StructType positionDeleteSparkType;
+
+    Builder(Table table) {
+      this.table = table;
+
+      Map<String, String> properties = table.properties();
+
+      String dataFileFormatName = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+      this.dataFileFormat = FileFormat.valueOf(dataFileFormatName.toUpperCase(Locale.ENGLISH));
+
+      String deleteFileFormatName = properties.getOrDefault(DELETE_DEFAULT_FILE_FORMAT, dataFileFormatName);
+      this.deleteFileFormat = FileFormat.valueOf(deleteFileFormatName.toUpperCase(Locale.ENGLISH));
+    }
+
+    Builder dataFileFormat(FileFormat newDataFileFormat) {
+      this.dataFileFormat = newDataFileFormat;
+      return this;
+    }
+
+    Builder dataSchema(Schema newDataSchema) {
+      this.dataSchema = newDataSchema;
+      return this;
+    }
+
+    Builder dataSparkType(StructType newDataSparkType) {
+      this.dataSparkType = newDataSparkType;
+      return this;
+    }
+
+    Builder dataSortOrder(SortOrder newDataSortOrder) {
+      this.dataSortOrder = newDataSortOrder;
+      return this;
+    }
+
+    Builder deleteFileFormat(FileFormat newDeleteFileFormat) {
+      this.deleteFileFormat = newDeleteFileFormat;
+      return this;
+    }
+
+    Builder equalityFieldIds(int[] newEqualityFieldIds) {
+      this.equalityFieldIds = newEqualityFieldIds;
+      return this;
+    }
+
+    Builder equalityDeleteRowSchema(Schema newEqualityDeleteRowSchema) {
+      this.equalityDeleteRowSchema = newEqualityDeleteRowSchema;
+      return this;
+    }
+
+    Builder equalityDeleteSparkType(StructType newEqualityDeleteSparkType) {
+      this.equalityDeleteSparkType = newEqualityDeleteSparkType;
+      return this;
+    }
+
+    Builder equalityDeleteSortOrder(SortOrder newEqualityDeleteSortOrder) {
+      this.equalityDeleteSortOrder = newEqualityDeleteSortOrder;
+      return this;
+    }
+
+    Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
+      this.positionDeleteRowSchema = newPositionDeleteRowSchema;
+      return this;
+    }
+
+    Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
+      this.positionDeleteSparkType = newPositionDeleteSparkType;
+      return this;
+    }
+
+    SparkWriterFactory build() {
+      boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null;
+      boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null;
+      Preconditions.checkArgument(noEqualityDeleteConf || fullEqualityDeleteConf,
+          "Equality field IDs and equality delete row schema must be set together");
+
+      return new SparkWriterFactory(
+          table, dataFileFormat, dataSchema, dataSparkType, dataSortOrder, deleteFileFormat,
+          equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSparkType, equalityDeleteSortOrder,
+          positionDeleteRowSchema, positionDeleteSparkType);
+    }
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java
new file mode 100644
index 0000000..eecbd66
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.TestWriterFactory;
+import org.apache.iceberg.io.WriterFactory;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class TestSparkWriterFactory extends TestWriterFactory<InternalRow> {
+
+  public TestSparkWriterFactory(FileFormat fileFormat, boolean partitioned) {
+    super(fileFormat, partitioned);
+  }
+
+  @Override
+  protected WriterFactory<InternalRow> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+                                                        Schema equalityDeleteRowSchema,
+                                                        Schema positionDeleteRowSchema) {
+    return SparkWriterFactory.builderFor(table)
+        .dataSchema(table.schema())
+        .dataFileFormat(format())
+        .deleteFileFormat(format())
+        .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
+        .equalityDeleteRowSchema(equalityDeleteRowSchema)
+        .positionDeleteRowSchema(positionDeleteRowSchema)
+        .build();
+  }
+
+  @Override
+  protected InternalRow toRow(Integer id, String data) {
+    InternalRow row = new GenericInternalRow(2);
+    row.update(0, id);
+    row.update(1, UTF8String.fromString(data));
+    return row;
+  }
+
+  @Override
+  protected StructLikeSet toSet(Iterable<InternalRow> rows) {
+    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    StructType sparkType = SparkSchemaUtil.convert(table.schema());
+    for (InternalRow row : rows) {
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      set.add(wrapper.wrap(row));
+    }
+    return set;
+  }
+}