Core: Add WriterFactory (#2873)
diff --git a/.baseline/checkstyle/checkstyle.xml b/.baseline/checkstyle/checkstyle.xml
index 9df746f..54be02b 100644
--- a/.baseline/checkstyle/checkstyle.xml
+++ b/.baseline/checkstyle/checkstyle.xml
@@ -97,6 +97,7 @@
org.apache.iceberg.IsolationLevel.*,
org.apache.iceberg.NullOrder.*,
org.apache.iceberg.MetadataTableType.*,
+ org.apache.iceberg.MetadataColumns.*,
org.apache.iceberg.SortDirection.*,
org.apache.iceberg.TableProperties.*,
org.apache.iceberg.types.Type.*,
diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
index a8eb2eb..e1cf096 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
@@ -44,6 +44,7 @@
Integer.MAX_VALUE - 101, "file_path", Types.StringType.get(), "Path of a file in which a deleted row is stored");
public static final NestedField DELETE_FILE_POS = NestedField.required(
Integer.MAX_VALUE - 102, "pos", Types.LongType.get(), "Ordinal position of a deleted row in the data file");
+ public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 4f66181..59715b0 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -321,6 +321,8 @@
public <T> DataWriter<T> build() throws IOException {
Preconditions.checkArgument(spec != null, "Cannot create data writer without spec");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null when creating data writer for partitioned spec");
FileAppender<T> fileAppender = appenderBuilder.build();
return new DataWriter<>(fileAppender, FileFormat.AVRO, location, spec, partition, keyMetadata, sortOrder);
@@ -428,6 +430,10 @@
Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
Preconditions.checkState(createWriterFunc != null,
"Cannot create equality delete file unless createWriterFunc is set");
+ Preconditions.checkArgument(spec != null,
+ "Spec must not be null when creating equality delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
meta("delete-type", "equality");
meta("delete-field-ids", IntStream.of(equalityFieldIds)
@@ -446,6 +452,10 @@
public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+ Preconditions.checkArgument(spec != null,
+ "Spec must not be null when creating position delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
meta("delete-type", "position");
diff --git a/core/src/main/java/org/apache/iceberg/io/WriterFactory.java b/core/src/main/java/org/apache/iceberg/io/WriterFactory.java
new file mode 100644
index 0000000..e797c1e
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/WriterFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+
+/**
+ * A factory for creating data and delete writers.
+ */
+public interface WriterFactory<T> {
+
+ /**
+ * Creates a new {@link DataWriter}.
+ *
+ * @param file the output file
+ * @param spec the partition spec written data belongs to
+ * @param partition the partition written data belongs to or null if the spec is unpartitioned
+ * @return the constructed data writer
+ */
+ DataWriter<T> newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition);
+
+ /**
+ * Creates a new {@link EqualityDeleteWriter}.
+ *
+ * @param file the output file
+ * @param spec the partition spec written deletes belong to
+ * @param partition the partition written deletes belong to or null if the spec is unpartitioned
+ * @return the constructed equality delete writer
+ */
+ EqualityDeleteWriter<T> newEqualityDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition);
+
+ /**
+ * Creates a new {@link PositionDeleteWriter}.
+ *
+ * @param file the output file
+ * @param spec the partition spec written deletes belong to
+ * @param partition the partition written deletes belong to or null if the spec is unpartitioned
+ * @return the constructed position delete writer
+ */
+ PositionDeleteWriter<T> newPositionDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition);
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java
new file mode 100644
index 0000000..72c0ea5
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/BaseWriterFactory.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.WriterFactory;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+
+/**
+ * A base writer factory to be extended by query engine integrations.
+ */
+public abstract class BaseWriterFactory<T> implements WriterFactory<T> {
+ private final Table table;
+ private final FileFormat dataFileFormat;
+ private final Schema dataSchema;
+ private final SortOrder dataSortOrder;
+ private final FileFormat deleteFileFormat;
+ private final int[] equalityFieldIds;
+ private final Schema equalityDeleteRowSchema;
+ private final SortOrder equalityDeleteSortOrder;
+ private final Schema positionDeleteRowSchema;
+
+ protected BaseWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema,
+ SortOrder dataSortOrder, FileFormat deleteFileFormat,
+ int[] equalityFieldIds, Schema equalityDeleteRowSchema,
+ SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema) {
+ this.table = table;
+ this.dataFileFormat = dataFileFormat;
+ this.dataSchema = dataSchema;
+ this.dataSortOrder = dataSortOrder;
+ this.deleteFileFormat = deleteFileFormat;
+ this.equalityFieldIds = equalityFieldIds;
+ this.equalityDeleteRowSchema = equalityDeleteRowSchema;
+ this.equalityDeleteSortOrder = equalityDeleteSortOrder;
+ this.positionDeleteRowSchema = positionDeleteRowSchema;
+ }
+
+ protected abstract void configureDataWrite(Avro.DataWriteBuilder builder);
+ protected abstract void configureEqualityDelete(Avro.DeleteWriteBuilder builder);
+ protected abstract void configurePositionDelete(Avro.DeleteWriteBuilder builder);
+
+ protected abstract void configureDataWrite(Parquet.DataWriteBuilder builder);
+ protected abstract void configureEqualityDelete(Parquet.DeleteWriteBuilder builder);
+ protected abstract void configurePositionDelete(Parquet.DeleteWriteBuilder builder);
+
+ // TODO: provide ways to configure ORC delete writers once we support them
+ protected abstract void configureDataWrite(ORC.DataWriteBuilder builder);
+
+ @Override
+ public DataWriter<T> newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+ OutputFile outputFile = file.encryptingOutputFile();
+ EncryptionKeyMetadata keyMetadata = file.keyMetadata();
+ Map<String, String> properties = table.properties();
+ MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
+
+ try {
+ switch (dataFileFormat) {
+ case AVRO:
+ Avro.DataWriteBuilder avroBuilder = Avro.writeData(outputFile)
+ .schema(dataSchema)
+ .setAll(properties)
+ .metricsConfig(metricsConfig)
+ .withSpec(spec)
+ .withPartition(partition)
+ .withKeyMetadata(keyMetadata)
+ .withSortOrder(dataSortOrder)
+ .overwrite();
+
+ configureDataWrite(avroBuilder);
+
+ return avroBuilder.build();
+
+ case PARQUET:
+ Parquet.DataWriteBuilder parquetBuilder = Parquet.writeData(outputFile)
+ .schema(dataSchema)
+ .setAll(properties)
+ .metricsConfig(metricsConfig)
+ .withSpec(spec)
+ .withPartition(partition)
+ .withKeyMetadata(keyMetadata)
+ .withSortOrder(dataSortOrder)
+ .overwrite();
+
+ configureDataWrite(parquetBuilder);
+
+ return parquetBuilder.build();
+
+ case ORC:
+ ORC.DataWriteBuilder orcBuilder = ORC.writeData(outputFile)
+ .schema(dataSchema)
+ .setAll(properties)
+ .metricsConfig(metricsConfig)
+ .withSpec(spec)
+ .withPartition(partition)
+ .withKeyMetadata(keyMetadata)
+ .withSortOrder(dataSortOrder)
+ .overwrite();
+
+ configureDataWrite(orcBuilder);
+
+ return orcBuilder.build();
+
+ default:
+ throw new UnsupportedOperationException("Unsupported data file format: " + dataFileFormat);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public EqualityDeleteWriter<T> newEqualityDeleteWriter(EncryptedOutputFile file, PartitionSpec spec,
+ StructLike partition) {
+ OutputFile outputFile = file.encryptingOutputFile();
+ EncryptionKeyMetadata keyMetadata = file.keyMetadata();
+ Map<String, String> properties = table.properties();
+ MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
+
+ try {
+ switch (deleteFileFormat) {
+ case AVRO:
+ // TODO: support metrics configs in Avro equality delete writer
+
+ Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile)
+ .setAll(properties)
+ .rowSchema(equalityDeleteRowSchema)
+ .equalityFieldIds(equalityFieldIds)
+ .withSpec(spec)
+ .withPartition(partition)
+ .withKeyMetadata(keyMetadata)
+ .withSortOrder(equalityDeleteSortOrder)
+ .overwrite();
+
+ configureEqualityDelete(avroBuilder);
+
+ return avroBuilder.buildEqualityWriter();
+
+ case PARQUET:
+ Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile)
+ .setAll(properties)
+ .metricsConfig(metricsConfig)
+ .rowSchema(equalityDeleteRowSchema)
+ .equalityFieldIds(equalityFieldIds)
+ .withSpec(spec)
+ .withPartition(partition)
+ .withKeyMetadata(keyMetadata)
+ .withSortOrder(equalityDeleteSortOrder)
+ .overwrite();
+
+ configureEqualityDelete(parquetBuilder);
+
+ return parquetBuilder.buildEqualityWriter();
+
+ default:
+ throw new UnsupportedOperationException("Unsupported format for equality deletes: " + deleteFileFormat);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create new equality delete writer", e);
+ }
+ }
+
+ @Override
+ public PositionDeleteWriter<T> newPositionDeleteWriter(EncryptedOutputFile file, PartitionSpec spec,
+ StructLike partition) {
+ OutputFile outputFile = file.encryptingOutputFile();
+ EncryptionKeyMetadata keyMetadata = file.keyMetadata();
+ Map<String, String> properties = table.properties();
+
+ // TODO: build and pass a correct metrics config for position deletes
+
+ try {
+ switch (deleteFileFormat) {
+ case AVRO:
+ Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile)
+ .setAll(properties)
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withPartition(partition)
+ .withKeyMetadata(keyMetadata)
+ .overwrite();
+
+ configurePositionDelete(avroBuilder);
+
+ return avroBuilder.buildPositionWriter();
+
+ case PARQUET:
+ Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile)
+ .setAll(properties)
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withPartition(partition)
+ .withKeyMetadata(keyMetadata)
+ .overwrite();
+
+ configurePositionDelete(parquetBuilder);
+
+ return parquetBuilder.buildPositionWriter();
+
+ default:
+ throw new UnsupportedOperationException("Unsupported format for position deletes: " + deleteFileFormat);
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create new position delete writer", e);
+ }
+ }
+
+ protected Schema dataSchema() {
+ return dataSchema;
+ }
+
+ protected Schema equalityDeleteRowSchema() {
+ return equalityDeleteRowSchema;
+ }
+
+ protected Schema positionDeleteRowSchema() {
+ return positionDeleteRowSchema;
+ }
+}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java
new file mode 100644
index 0000000..ec19326
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterFactory.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
+
+@RunWith(Parameterized.class)
+public abstract class TestWriterFactory<T> extends TableTestBase {
+ @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ new Object[]{FileFormat.AVRO, false},
+ new Object[]{FileFormat.AVRO, true},
+ new Object[]{FileFormat.PARQUET, false},
+ new Object[]{FileFormat.PARQUET, true},
+ new Object[]{FileFormat.ORC, false},
+ new Object[]{FileFormat.ORC, true}
+ };
+ }
+
+ private static final int TABLE_FORMAT_VERSION = 2;
+
+ private final FileFormat fileFormat;
+ private final boolean partitioned;
+ private final List<T> dataRows;
+
+ private StructLike partition = null;
+ private OutputFileFactory fileFactory = null;
+
+ public TestWriterFactory(FileFormat fileFormat, boolean partitioned) {
+ super(TABLE_FORMAT_VERSION);
+ this.fileFormat = fileFormat;
+ this.partitioned = partitioned;
+ this.dataRows = ImmutableList.of(
+ toRow(1, "aaa"),
+ toRow(2, "aaa"),
+ toRow(3, "aaa"),
+ toRow(4, "aaa"),
+ toRow(5, "aaa")
+ );
+ }
+
+ protected abstract WriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+ Schema equalityDeleteRowSchema, Schema positionDeleteRowSchema);
+
+ protected abstract T toRow(Integer id, String data);
+
+ protected abstract StructLikeSet toSet(Iterable<T> records);
+
+ protected FileFormat format() {
+ return fileFormat;
+ }
+
+ @Before
+ public void setupTable() throws Exception {
+ this.tableDir = temp.newFolder();
+ Assert.assertTrue(tableDir.delete()); // created during table creation
+
+ this.metadataDir = new File(tableDir, "metadata");
+
+ if (partitioned) {
+ this.table = create(SCHEMA, SPEC);
+ this.partition = initPartitionKey();
+ } else {
+ this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+ this.partition = null;
+ }
+
+ this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+ }
+
+ @Test
+ public void testDataWriter() throws IOException {
+ WriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+
+ table.newRowDelta()
+ .addRows(dataFile)
+ .commit();
+
+ Assert.assertEquals("Records should match", toSet(dataRows), actualRowSet("*"));
+ }
+
+ @Test
+ public void testEqualityDeleteWriter() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+ Schema equalityDeleteRowSchema = table.schema().select("id");
+ WriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+ // write a data file
+ DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+
+ // commit the written data file
+ table.newRowDelta()
+ .addRows(dataFile)
+ .commit();
+
+ // write an equality delete file
+ List<T> deletes = ImmutableList.of(
+ toRow(1, "aaa"),
+ toRow(3, "bbb"),
+ toRow(5, "ccc")
+ );
+ DeleteFile deleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition);
+
+ // verify the written delete file
+ GenericRecord deleteRecord = GenericRecord.create(equalityDeleteRowSchema);
+ List<Record> expectedDeletes = ImmutableList.of(
+ deleteRecord.copy("id", 1),
+ deleteRecord.copy("id", 3),
+ deleteRecord.copy("id", 5)
+ );
+ InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString());
+ List<Record> actualDeletes = readFile(equalityDeleteRowSchema, inputDeleteFile);
+ Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes);
+
+ // commit the written delete file
+ table.newRowDelta()
+ .addDeletes(deleteFile)
+ .commit();
+
+ // verify the delete file is applied correctly
+ List<T> expectedRows = ImmutableList.of(
+ toRow(2, "aaa"),
+ toRow(4, "aaa")
+ );
+ Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+ }
+
+ @Test
+ public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+ Assume.assumeFalse("Table must start unpartitioned", partitioned);
+
+ List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+ Schema equalityDeleteRowSchema = table.schema().select("id");
+ WriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+ // write an unpartitioned data file
+ DataFile firstDataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+ Assert.assertEquals("First data file must be unpartitioned", 0, firstDataFile.partition().size());
+
+ List<T> deletes = ImmutableList.of(
+ toRow(1, "aaa"),
+ toRow(2, "aaa"),
+ toRow(3, "aaa"),
+ toRow(4, "aaa")
+ );
+
+ // write an unpartitioned delete file
+ DeleteFile firstDeleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition);
+ Assert.assertEquals("First delete file must be unpartitioned", 0, firstDeleteFile.partition().size());
+
+ // commit the first data and delete files
+ table.newAppend()
+ .appendFile(firstDataFile)
+ .commit();
+ table.newRowDelta()
+ .addDeletes(firstDeleteFile)
+ .commit();
+
+ // evolve the spec
+ table.updateSpec()
+ .addField("data")
+ .commit();
+
+ partition = initPartitionKey();
+
+ // write a partitioned data file
+ DataFile secondDataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+ Assert.assertEquals("Second data file must be partitioned", 1, secondDataFile.partition().size());
+
+ // write a partitioned delete file
+ DeleteFile secondDeleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition);
+ Assert.assertEquals("Second delete file must be artitioned", 1, secondDeleteFile.partition().size());
+
+ // commit the second data and delete files
+ table.newAppend()
+ .appendFile(secondDataFile)
+ .commit();
+ table.newRowDelta()
+ .addDeletes(secondDeleteFile)
+ .commit();
+
+ // verify both delete files are applied correctly
+ List<T> expectedRows = ImmutableList.of(
+ toRow(5, "aaa"),
+ toRow(5, "aaa")
+ );
+ Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+ }
+
+ @Test
+ public void testPositionDeleteWriter() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ WriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // write a data file
+ DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+
+ // write a position delete file
+ List<PositionDelete<T>> deletes = ImmutableList.of(
+ new PositionDelete<T>().set(dataFile.path(), 0L, null),
+ new PositionDelete<T>().set(dataFile.path(), 2L, null),
+ new PositionDelete<T>().set(dataFile.path(), 4L, null)
+ );
+ Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
+ DeleteFile deleteFile = result.first();
+ CharSequenceSet referencedDataFiles = result.second();
+
+ // verify the written delete file
+ GenericRecord deleteRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
+ List<Record> expectedDeletes = ImmutableList.of(
+ deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 0L),
+ deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 2L),
+ deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 4L)
+ );
+ InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString());
+ List<Record> actualDeletes = readFile(DeleteSchemaUtil.pathPosSchema(), inputDeleteFile);
+ Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes);
+
+ // commit the data and delete files
+ table.newRowDelta()
+ .addRows(dataFile)
+ .addDeletes(deleteFile)
+ .validateDataFilesExist(referencedDataFiles)
+ .validateDeletedFiles()
+ .commit();
+
+ // verify the delete file is applied correctly
+ List<T> expectedRows = ImmutableList.of(
+ toRow(2, "aaa"),
+ toRow(4, "aaa")
+ );
+ Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+ }
+
+ @Test
+ public void testPositionDeleteWriterWithRow() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ WriterFactory<T> writerFactory = newWriterFactory(table.schema(), table.schema());
+
+ // write a data file
+ DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
+
+ // write a position delete file and persist the deleted row
+ List<PositionDelete<T>> deletes = ImmutableList.of(
+ new PositionDelete<T>().set(dataFile.path(), 0, dataRows.get(0))
+ );
+ Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
+ DeleteFile deleteFile = result.first();
+ CharSequenceSet referencedDataFiles = result.second();
+
+ // verify the written delete file
+ GenericRecord deletedRow = GenericRecord.create(table.schema());
+ Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(table.schema());
+ GenericRecord deleteRecord = GenericRecord.create(positionDeleteSchema);
+ Map<String, Object> deleteRecordColumns = ImmutableMap.of(
+ DELETE_FILE_PATH.name(), dataFile.path(),
+ DELETE_FILE_POS.name(), 0L,
+ DELETE_FILE_ROW_FIELD_NAME, deletedRow.copy("id", 1, "data", "aaa")
+ );
+ List<Record> expectedDeletes = ImmutableList.of(deleteRecord.copy(deleteRecordColumns));
+ InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString());
+ List<Record> actualDeletes = readFile(positionDeleteSchema, inputDeleteFile);
+ Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes);
+
+ // commit the data and delete files
+ table.newRowDelta()
+ .addRows(dataFile)
+ .addDeletes(deleteFile)
+ .validateDataFilesExist(referencedDataFiles)
+ .validateDeletedFiles()
+ .commit();
+
+ // verify the delete file is applied correctly
+ List<T> expectedRows = ImmutableList.of(
+ toRow(2, "aaa"),
+ toRow(3, "aaa"),
+ toRow(4, "aaa"),
+ toRow(5, "aaa")
+ );
+ Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+ }
+
+ private PartitionKey initPartitionKey() {
+ Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa"));
+
+ PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+ partitionKey.partition(record);
+
+ return partitionKey;
+ }
+
+ private WriterFactory<T> newWriterFactory(Schema dataSchema) {
+ return newWriterFactory(dataSchema, null, null, null);
+ }
+
+ private WriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+ Schema equalityDeleteRowSchema) {
+ return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null);
+ }
+
+ private WriterFactory<T> newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) {
+ return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema);
+ }
+
+ private DataFile writeData(WriterFactory<T> writerFactory, List<T> rows,
+ PartitionSpec spec, StructLike partitionKey) throws IOException {
+
+ EncryptedOutputFile file = newOutputFile(spec, partitionKey);
+ DataWriter<T> writer = writerFactory.newDataWriter(file, spec, partitionKey);
+
+ try (DataWriter<T> closeableWriter = writer) {
+ for (T row : rows) {
+ closeableWriter.add(row);
+ }
+ }
+
+ return writer.toDataFile();
+ }
+
+ private DeleteFile writeEqualityDeletes(WriterFactory<T> writerFactory, List<T> deletes,
+ PartitionSpec spec, StructLike partitionKey) throws IOException {
+
+ EncryptedOutputFile file = newOutputFile(spec, partitionKey);
+ EqualityDeleteWriter<T> writer = writerFactory.newEqualityDeleteWriter(file, spec, partitionKey);
+
+ try (EqualityDeleteWriter<T> closableWriter = writer) {
+ closableWriter.deleteAll(deletes);
+ }
+
+ return writer.toDeleteFile();
+ }
+
+ private Pair<DeleteFile, CharSequenceSet> writePositionDeletes(WriterFactory<T> writerFactory,
+ List<PositionDelete<T>> deletes,
+ PartitionSpec spec,
+ StructLike partitionKey) throws IOException {
+
+ EncryptedOutputFile file = newOutputFile(spec, partitionKey);
+ PositionDeleteWriter<T> writer = writerFactory.newPositionDeleteWriter(file, spec, partitionKey);
+
+ try (PositionDeleteWriter<T> closableWriter = writer) {
+ for (PositionDelete<T> delete : deletes) {
+ closableWriter.delete(delete.path(), delete.pos(), delete.row());
+ }
+ }
+
+ return Pair.of(writer.toDeleteFile(), writer.referencedDataFiles());
+ }
+
+ private List<Record> readFile(Schema schema, InputFile inputFile) throws IOException {
+ switch (fileFormat) {
+ case PARQUET:
+ try (CloseableIterable<Record> records = Parquet.read(inputFile)
+ .project(schema)
+ .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
+ .build()) {
+
+ return ImmutableList.copyOf(records);
+ }
+
+ case AVRO:
+ try (CloseableIterable<Record> records = Avro.read(inputFile)
+ .project(schema)
+ .createReaderFunc(DataReader::create)
+ .build()) {
+
+ return ImmutableList.copyOf(records);
+ }
+
+ default:
+ throw new UnsupportedOperationException("Unsupported read file format: " + fileFormat);
+ }
+ }
+
+ private StructLikeSet actualRowSet(String... columns) throws IOException {
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
+ reader.forEach(set::add);
+ }
+ return set;
+ }
+
+ private EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitionKey) {
+ return fileFactory.newOutputFile(spec, partitionKey);
+ }
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 1ab5c70..417598b 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -215,6 +215,8 @@
public <T> DataWriter<T> build() {
Preconditions.checkArgument(spec != null, "Cannot create data writer without spec");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null when creating data writer for partitioned spec");
FileAppender<T> fileAppender = appenderBuilder.build();
return new DataWriter<>(fileAppender, FileFormat.ORC, location, spec, partition, keyMetadata, sortOrder);
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index fef3209..aadddbf 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -456,6 +456,8 @@
public <T> DataWriter<T> build() throws IOException {
Preconditions.checkArgument(spec != null, "Cannot create data writer without spec");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null when creating data writer for partitioned spec");
FileAppender<T> fileAppender = appenderBuilder.build();
return new DataWriter<>(fileAppender, FileFormat.PARQUET, location, spec, partition, keyMetadata, sortOrder);
@@ -571,6 +573,10 @@
Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
Preconditions.checkState(createWriterFunc != null,
"Cannot create equality delete file unless createWriterFunc is set");
+ Preconditions.checkArgument(spec != null,
+ "Spec must not be null when creating equality delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
meta("delete-type", "equality");
meta("delete-field-ids", IntStream.of(equalityFieldIds)
@@ -589,6 +595,10 @@
public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+ Preconditions.checkArgument(spec != null,
+ "Spec must not be null when creating position delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
meta("delete-type", "position");
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java
new file mode 100644
index 0000000..1ce202d
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.BaseWriterFactory;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
+
+class SparkWriterFactory extends BaseWriterFactory<InternalRow> {
+ private StructType dataSparkType;
+ private StructType equalityDeleteSparkType;
+ private StructType positionDeleteSparkType;
+
+ SparkWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, StructType dataSparkType,
+ SortOrder dataSortOrder, FileFormat deleteFileFormat,
+ int[] equalityFieldIds, Schema equalityDeleteRowSchema, StructType equalityDeleteSparkType,
+ SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema,
+ StructType positionDeleteSparkType) {
+
+ super(table, dataFileFormat, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds,
+ equalityDeleteRowSchema, equalityDeleteSortOrder, positionDeleteRowSchema);
+
+ this.dataSparkType = dataSparkType;
+ this.equalityDeleteSparkType = equalityDeleteSparkType;
+ this.positionDeleteSparkType = positionDeleteSparkType;
+ }
+
+ static Builder builderFor(Table table) {
+ return new Builder(table);
+ }
+
+ @Override
+ protected void configureDataWrite(Avro.DataWriteBuilder builder) {
+ builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
+ }
+
+ @Override
+ protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
+ builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType()));
+ }
+
+ @Override
+ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
+ boolean withRow = positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined();
+ if (withRow) {
+ // SparkAvroWriter accepts just the Spark type of the row ignoring the path and pos
+ StructField rowField = positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME);
+ StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
+ builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType));
+ }
+ }
+
+ @Override
+ protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
+ builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType));
+ }
+
+ @Override
+ protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
+ builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType));
+ }
+
+ @Override
+ protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
+ builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType));
+ builder.transformPaths(path -> UTF8String.fromString(path.toString()));
+ }
+
+ @Override
+ protected void configureDataWrite(ORC.DataWriteBuilder builder) {
+ builder.createWriterFunc(SparkOrcWriter::new);
+ }
+
+ private StructType dataSparkType() {
+ if (dataSparkType == null) {
+ Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
+ this.dataSparkType = SparkSchemaUtil.convert(dataSchema());
+ }
+
+ return dataSparkType;
+ }
+
+ private StructType equalityDeleteSparkType() {
+ if (equalityDeleteSparkType == null) {
+ Preconditions.checkNotNull(equalityDeleteRowSchema(), "Equality delete schema must not be null");
+ this.equalityDeleteSparkType = SparkSchemaUtil.convert(equalityDeleteRowSchema());
+ }
+
+ return equalityDeleteSparkType;
+ }
+
+ private StructType positionDeleteSparkType() {
+ if (positionDeleteSparkType == null) {
+ // wrap the optional row schema into the position delete schema that contains path and position
+ Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
+ this.positionDeleteSparkType = SparkSchemaUtil.convert(positionDeleteSchema);
+ }
+
+ return positionDeleteSparkType;
+ }
+
+ static class Builder {
+ private final Table table;
+ private FileFormat dataFileFormat;
+ private Schema dataSchema;
+ private StructType dataSparkType;
+ private SortOrder dataSortOrder;
+ private FileFormat deleteFileFormat;
+ private int[] equalityFieldIds;
+ private Schema equalityDeleteRowSchema;
+ private StructType equalityDeleteSparkType;
+ private SortOrder equalityDeleteSortOrder;
+ private Schema positionDeleteRowSchema;
+ private StructType positionDeleteSparkType;
+
+ Builder(Table table) {
+ this.table = table;
+
+ Map<String, String> properties = table.properties();
+
+ String dataFileFormatName = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+ this.dataFileFormat = FileFormat.valueOf(dataFileFormatName.toUpperCase(Locale.ENGLISH));
+
+ String deleteFileFormatName = properties.getOrDefault(DELETE_DEFAULT_FILE_FORMAT, dataFileFormatName);
+ this.deleteFileFormat = FileFormat.valueOf(deleteFileFormatName.toUpperCase(Locale.ENGLISH));
+ }
+
+ Builder dataFileFormat(FileFormat newDataFileFormat) {
+ this.dataFileFormat = newDataFileFormat;
+ return this;
+ }
+
+ Builder dataSchema(Schema newDataSchema) {
+ this.dataSchema = newDataSchema;
+ return this;
+ }
+
+ Builder dataSparkType(StructType newDataSparkType) {
+ this.dataSparkType = newDataSparkType;
+ return this;
+ }
+
+ Builder dataSortOrder(SortOrder newDataSortOrder) {
+ this.dataSortOrder = newDataSortOrder;
+ return this;
+ }
+
+ Builder deleteFileFormat(FileFormat newDeleteFileFormat) {
+ this.deleteFileFormat = newDeleteFileFormat;
+ return this;
+ }
+
+ Builder equalityFieldIds(int[] newEqualityFieldIds) {
+ this.equalityFieldIds = newEqualityFieldIds;
+ return this;
+ }
+
+ Builder equalityDeleteRowSchema(Schema newEqualityDeleteRowSchema) {
+ this.equalityDeleteRowSchema = newEqualityDeleteRowSchema;
+ return this;
+ }
+
+ Builder equalityDeleteSparkType(StructType newEqualityDeleteSparkType) {
+ this.equalityDeleteSparkType = newEqualityDeleteSparkType;
+ return this;
+ }
+
+ Builder equalityDeleteSortOrder(SortOrder newEqualityDeleteSortOrder) {
+ this.equalityDeleteSortOrder = newEqualityDeleteSortOrder;
+ return this;
+ }
+
+ Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
+ this.positionDeleteRowSchema = newPositionDeleteRowSchema;
+ return this;
+ }
+
+ Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
+ this.positionDeleteSparkType = newPositionDeleteSparkType;
+ return this;
+ }
+
+ SparkWriterFactory build() {
+ boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null;
+ boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null;
+ Preconditions.checkArgument(noEqualityDeleteConf || fullEqualityDeleteConf,
+ "Equality field IDs and equality delete row schema must be set together");
+
+ return new SparkWriterFactory(
+ table, dataFileFormat, dataSchema, dataSparkType, dataSortOrder, deleteFileFormat,
+ equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSparkType, equalityDeleteSortOrder,
+ positionDeleteRowSchema, positionDeleteSparkType);
+ }
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java
new file mode 100644
index 0000000..eecbd66
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.TestWriterFactory;
+import org.apache.iceberg.io.WriterFactory;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class TestSparkWriterFactory extends TestWriterFactory<InternalRow> {
+
+ public TestSparkWriterFactory(FileFormat fileFormat, boolean partitioned) {
+ super(fileFormat, partitioned);
+ }
+
+ @Override
+ protected WriterFactory<InternalRow> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+ Schema equalityDeleteRowSchema,
+ Schema positionDeleteRowSchema) {
+ return SparkWriterFactory.builderFor(table)
+ .dataSchema(table.schema())
+ .dataFileFormat(format())
+ .deleteFileFormat(format())
+ .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
+ .equalityDeleteRowSchema(equalityDeleteRowSchema)
+ .positionDeleteRowSchema(positionDeleteRowSchema)
+ .build();
+ }
+
+ @Override
+ protected InternalRow toRow(Integer id, String data) {
+ InternalRow row = new GenericInternalRow(2);
+ row.update(0, id);
+ row.update(1, UTF8String.fromString(data));
+ return row;
+ }
+
+ @Override
+ protected StructLikeSet toSet(Iterable<InternalRow> rows) {
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ StructType sparkType = SparkSchemaUtil.convert(table.schema());
+ for (InternalRow row : rows) {
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ set.add(wrapper.wrap(row));
+ }
+ return set;
+ }
+}