blob: ec193264f37565583436a508b8127286324d9f1a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
@RunWith(Parameterized.class)
public abstract class TestWriterFactory<T> extends TableTestBase {
@Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
public static Object[] parameters() {
return new Object[][] {
new Object[]{FileFormat.AVRO, false},
new Object[]{FileFormat.AVRO, true},
new Object[]{FileFormat.PARQUET, false},
new Object[]{FileFormat.PARQUET, true},
new Object[]{FileFormat.ORC, false},
new Object[]{FileFormat.ORC, true}
};
}
private static final int TABLE_FORMAT_VERSION = 2;
private final FileFormat fileFormat;
private final boolean partitioned;
private final List<T> dataRows;
private StructLike partition = null;
private OutputFileFactory fileFactory = null;
public TestWriterFactory(FileFormat fileFormat, boolean partitioned) {
super(TABLE_FORMAT_VERSION);
this.fileFormat = fileFormat;
this.partitioned = partitioned;
this.dataRows = ImmutableList.of(
toRow(1, "aaa"),
toRow(2, "aaa"),
toRow(3, "aaa"),
toRow(4, "aaa"),
toRow(5, "aaa")
);
}
protected abstract WriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
Schema equalityDeleteRowSchema, Schema positionDeleteRowSchema);
protected abstract T toRow(Integer id, String data);
protected abstract StructLikeSet toSet(Iterable<T> records);
protected FileFormat format() {
return fileFormat;
}
@Before
public void setupTable() throws Exception {
this.tableDir = temp.newFolder();
Assert.assertTrue(tableDir.delete()); // created during table creation
this.metadataDir = new File(tableDir, "metadata");
if (partitioned) {
this.table = create(SCHEMA, SPEC);
this.partition = initPartitionKey();
} else {
this.table = create(SCHEMA, PartitionSpec.unpartitioned());
this.partition = null;
}
this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
}
@Test
public void testDataWriter() throws IOException {
WriterFactory<T> writerFactory = newWriterFactory(table.schema());
DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
table.newRowDelta()
.addRows(dataFile)
.commit();
Assert.assertEquals("Records should match", toSet(dataRows), actualRowSet("*"));
}
@Test
public void testEqualityDeleteWriter() throws IOException {
Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
Schema equalityDeleteRowSchema = table.schema().select("id");
WriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
// write a data file
DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
// commit the written data file
table.newRowDelta()
.addRows(dataFile)
.commit();
// write an equality delete file
List<T> deletes = ImmutableList.of(
toRow(1, "aaa"),
toRow(3, "bbb"),
toRow(5, "ccc")
);
DeleteFile deleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition);
// verify the written delete file
GenericRecord deleteRecord = GenericRecord.create(equalityDeleteRowSchema);
List<Record> expectedDeletes = ImmutableList.of(
deleteRecord.copy("id", 1),
deleteRecord.copy("id", 3),
deleteRecord.copy("id", 5)
);
InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString());
List<Record> actualDeletes = readFile(equalityDeleteRowSchema, inputDeleteFile);
Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes);
// commit the written delete file
table.newRowDelta()
.addDeletes(deleteFile)
.commit();
// verify the delete file is applied correctly
List<T> expectedRows = ImmutableList.of(
toRow(2, "aaa"),
toRow(4, "aaa")
);
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}
@Test
public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException {
Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
Assume.assumeFalse("Table must start unpartitioned", partitioned);
List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
Schema equalityDeleteRowSchema = table.schema().select("id");
WriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
// write an unpartitioned data file
DataFile firstDataFile = writeData(writerFactory, dataRows, table.spec(), partition);
Assert.assertEquals("First data file must be unpartitioned", 0, firstDataFile.partition().size());
List<T> deletes = ImmutableList.of(
toRow(1, "aaa"),
toRow(2, "aaa"),
toRow(3, "aaa"),
toRow(4, "aaa")
);
// write an unpartitioned delete file
DeleteFile firstDeleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition);
Assert.assertEquals("First delete file must be unpartitioned", 0, firstDeleteFile.partition().size());
// commit the first data and delete files
table.newAppend()
.appendFile(firstDataFile)
.commit();
table.newRowDelta()
.addDeletes(firstDeleteFile)
.commit();
// evolve the spec
table.updateSpec()
.addField("data")
.commit();
partition = initPartitionKey();
// write a partitioned data file
DataFile secondDataFile = writeData(writerFactory, dataRows, table.spec(), partition);
Assert.assertEquals("Second data file must be partitioned", 1, secondDataFile.partition().size());
// write a partitioned delete file
DeleteFile secondDeleteFile = writeEqualityDeletes(writerFactory, deletes, table.spec(), partition);
Assert.assertEquals("Second delete file must be artitioned", 1, secondDeleteFile.partition().size());
// commit the second data and delete files
table.newAppend()
.appendFile(secondDataFile)
.commit();
table.newRowDelta()
.addDeletes(secondDeleteFile)
.commit();
// verify both delete files are applied correctly
List<T> expectedRows = ImmutableList.of(
toRow(5, "aaa"),
toRow(5, "aaa")
);
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}
@Test
public void testPositionDeleteWriter() throws IOException {
Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
WriterFactory<T> writerFactory = newWriterFactory(table.schema());
// write a data file
DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
// write a position delete file
List<PositionDelete<T>> deletes = ImmutableList.of(
new PositionDelete<T>().set(dataFile.path(), 0L, null),
new PositionDelete<T>().set(dataFile.path(), 2L, null),
new PositionDelete<T>().set(dataFile.path(), 4L, null)
);
Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
DeleteFile deleteFile = result.first();
CharSequenceSet referencedDataFiles = result.second();
// verify the written delete file
GenericRecord deleteRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
List<Record> expectedDeletes = ImmutableList.of(
deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 0L),
deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 2L),
deleteRecord.copy(DELETE_FILE_PATH.name(), dataFile.path(), DELETE_FILE_POS.name(), 4L)
);
InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString());
List<Record> actualDeletes = readFile(DeleteSchemaUtil.pathPosSchema(), inputDeleteFile);
Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes);
// commit the data and delete files
table.newRowDelta()
.addRows(dataFile)
.addDeletes(deleteFile)
.validateDataFilesExist(referencedDataFiles)
.validateDeletedFiles()
.commit();
// verify the delete file is applied correctly
List<T> expectedRows = ImmutableList.of(
toRow(2, "aaa"),
toRow(4, "aaa")
);
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}
@Test
public void testPositionDeleteWriterWithRow() throws IOException {
Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
WriterFactory<T> writerFactory = newWriterFactory(table.schema(), table.schema());
// write a data file
DataFile dataFile = writeData(writerFactory, dataRows, table.spec(), partition);
// write a position delete file and persist the deleted row
List<PositionDelete<T>> deletes = ImmutableList.of(
new PositionDelete<T>().set(dataFile.path(), 0, dataRows.get(0))
);
Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
DeleteFile deleteFile = result.first();
CharSequenceSet referencedDataFiles = result.second();
// verify the written delete file
GenericRecord deletedRow = GenericRecord.create(table.schema());
Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(table.schema());
GenericRecord deleteRecord = GenericRecord.create(positionDeleteSchema);
Map<String, Object> deleteRecordColumns = ImmutableMap.of(
DELETE_FILE_PATH.name(), dataFile.path(),
DELETE_FILE_POS.name(), 0L,
DELETE_FILE_ROW_FIELD_NAME, deletedRow.copy("id", 1, "data", "aaa")
);
List<Record> expectedDeletes = ImmutableList.of(deleteRecord.copy(deleteRecordColumns));
InputFile inputDeleteFile = table.io().newInputFile(deleteFile.path().toString());
List<Record> actualDeletes = readFile(positionDeleteSchema, inputDeleteFile);
Assert.assertEquals("Delete records must match", expectedDeletes, actualDeletes);
// commit the data and delete files
table.newRowDelta()
.addRows(dataFile)
.addDeletes(deleteFile)
.validateDataFilesExist(referencedDataFiles)
.validateDeletedFiles()
.commit();
// verify the delete file is applied correctly
List<T> expectedRows = ImmutableList.of(
toRow(2, "aaa"),
toRow(3, "aaa"),
toRow(4, "aaa"),
toRow(5, "aaa")
);
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}
private PartitionKey initPartitionKey() {
Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa"));
PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
partitionKey.partition(record);
return partitionKey;
}
private WriterFactory<T> newWriterFactory(Schema dataSchema) {
return newWriterFactory(dataSchema, null, null, null);
}
private WriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
Schema equalityDeleteRowSchema) {
return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null);
}
private WriterFactory<T> newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) {
return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema);
}
private DataFile writeData(WriterFactory<T> writerFactory, List<T> rows,
PartitionSpec spec, StructLike partitionKey) throws IOException {
EncryptedOutputFile file = newOutputFile(spec, partitionKey);
DataWriter<T> writer = writerFactory.newDataWriter(file, spec, partitionKey);
try (DataWriter<T> closeableWriter = writer) {
for (T row : rows) {
closeableWriter.add(row);
}
}
return writer.toDataFile();
}
private DeleteFile writeEqualityDeletes(WriterFactory<T> writerFactory, List<T> deletes,
PartitionSpec spec, StructLike partitionKey) throws IOException {
EncryptedOutputFile file = newOutputFile(spec, partitionKey);
EqualityDeleteWriter<T> writer = writerFactory.newEqualityDeleteWriter(file, spec, partitionKey);
try (EqualityDeleteWriter<T> closableWriter = writer) {
closableWriter.deleteAll(deletes);
}
return writer.toDeleteFile();
}
private Pair<DeleteFile, CharSequenceSet> writePositionDeletes(WriterFactory<T> writerFactory,
List<PositionDelete<T>> deletes,
PartitionSpec spec,
StructLike partitionKey) throws IOException {
EncryptedOutputFile file = newOutputFile(spec, partitionKey);
PositionDeleteWriter<T> writer = writerFactory.newPositionDeleteWriter(file, spec, partitionKey);
try (PositionDeleteWriter<T> closableWriter = writer) {
for (PositionDelete<T> delete : deletes) {
closableWriter.delete(delete.path(), delete.pos(), delete.row());
}
}
return Pair.of(writer.toDeleteFile(), writer.referencedDataFiles());
}
private List<Record> readFile(Schema schema, InputFile inputFile) throws IOException {
switch (fileFormat) {
case PARQUET:
try (CloseableIterable<Record> records = Parquet.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()) {
return ImmutableList.copyOf(records);
}
case AVRO:
try (CloseableIterable<Record> records = Avro.read(inputFile)
.project(schema)
.createReaderFunc(DataReader::create)
.build()) {
return ImmutableList.copyOf(records);
}
default:
throw new UnsupportedOperationException("Unsupported read file format: " + fileFormat);
}
}
private StructLikeSet actualRowSet(String... columns) throws IOException {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
reader.forEach(set::add);
}
return set;
}
private EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitionKey) {
return fileFactory.newOutputFile(spec, partitionKey);
}
}