blob: 77ceec1e987a13bbc728c37c4a8e190edbecfaef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestParquetDeleteWriters {
private static final Schema SCHEMA = new Schema(
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "data", Types.StringType.get()));
private List<Record> records;
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@Before
public void createDeleteRecords() {
GenericRecord record = GenericRecord.create(SCHEMA);
ImmutableList.Builder<Record> builder = ImmutableList.builder();
builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
builder.add(record.copy(ImmutableMap.of("id", 5L, "data", "e")));
this.records = builder.build();
}
@Test
public void testEqualityDeleteWriter() throws IOException {
File deleteFile = temp.newFile();
OutputFile out = Files.localOutput(deleteFile);
EqualityDeleteWriter<Record> deleteWriter = Parquet.writeDeletes(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.rowSchema(SCHEMA)
.withSpec(PartitionSpec.unpartitioned())
.equalityFieldIds(1)
.buildEqualityWriter();
try (EqualityDeleteWriter<Record> writer = deleteWriter) {
writer.deleteAll(records);
}
DeleteFile metadata = deleteWriter.toDeleteFile();
Assert.assertEquals("Format should be Parquet", FileFormat.PARQUET, metadata.format());
Assert.assertEquals("Should be equality deletes", FileContent.EQUALITY_DELETES, metadata.content());
Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
List<Record> deletedRecords;
try (CloseableIterable<Record> reader = Parquet.read(out.toInputFile())
.project(SCHEMA)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(SCHEMA, fileSchema))
.build()) {
deletedRecords = Lists.newArrayList(reader);
}
Assert.assertEquals("Deleted records should match expected", records, deletedRecords);
}
@Test
public void testPositionDeleteWriter() throws IOException {
File deleteFile = temp.newFile();
Schema deleteSchema = new Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
NestedField.optional(MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", SCHEMA.asStruct()));
String deletePath = "s3://bucket/path/file.parquet";
GenericRecord posDelete = GenericRecord.create(deleteSchema);
List<Record> expectedDeleteRecords = Lists.newArrayList();
OutputFile out = Files.localOutput(deleteFile);
PositionDeleteWriter<Record> deleteWriter = Parquet.writeDeletes(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.rowSchema(SCHEMA)
.withSpec(PartitionSpec.unpartitioned())
.buildPositionWriter();
try (PositionDeleteWriter<Record> writer = deleteWriter) {
for (int i = 0; i < records.size(); i += 1) {
int pos = i * 3 + 2;
writer.delete(deletePath, pos, records.get(i));
expectedDeleteRecords.add(posDelete.copy(ImmutableMap.of(
"file_path", deletePath,
"pos", (long) pos,
"row", records.get(i))));
}
}
DeleteFile metadata = deleteWriter.toDeleteFile();
Assert.assertEquals("Format should be Parquet", FileFormat.PARQUET, metadata.format());
Assert.assertEquals("Should be position deletes", FileContent.POSITION_DELETES, metadata.content());
Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
List<Record> deletedRecords;
try (CloseableIterable<Record> reader = Parquet.read(out.toInputFile())
.project(deleteSchema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
.build()) {
deletedRecords = Lists.newArrayList(reader);
}
Assert.assertEquals("Deleted records should match expected", expectedDeleteRecords, deletedRecords);
}
@Test
public void testPositionDeleteWriterWithEmptyRow() throws IOException {
File deleteFile = temp.newFile();
Schema deleteSchema = new Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS);
String deletePath = "s3://bucket/path/file.parquet";
GenericRecord posDelete = GenericRecord.create(deleteSchema);
List<Record> expectedDeleteRecords = Lists.newArrayList();
OutputFile out = Files.localOutput(deleteFile);
PositionDeleteWriter<Void> deleteWriter = Parquet.writeDeletes(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.buildPositionWriter();
try (PositionDeleteWriter<Void> writer = deleteWriter) {
for (int i = 0; i < records.size(); i += 1) {
int pos = i * 3 + 2;
writer.delete(deletePath, pos, null);
expectedDeleteRecords.add(posDelete.copy(ImmutableMap.of(
"file_path", deletePath,
"pos", (long) pos)));
}
}
DeleteFile metadata = deleteWriter.toDeleteFile();
Assert.assertEquals("Format should be Parquet", FileFormat.PARQUET, metadata.format());
Assert.assertEquals("Should be position deletes", FileContent.POSITION_DELETES, metadata.content());
Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
List<Record> deletedRecords;
try (CloseableIterable<Record> reader = Parquet.read(out.toInputFile())
.project(deleteSchema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
.build()) {
deletedRecords = Lists.newArrayList(reader);
}
Assert.assertEquals("Deleted records should match expected", expectedDeleteRecords, deletedRecords);
}
}