blob: b2fdaa664eb920b11c4d46d2789656c73cc6f723 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.optional;
@RunWith(Parameterized.class)
public abstract class TestRewriteManifestsAction extends SparkTestBase {
private static final HadoopTables TABLES = new HadoopTables(new Configuration());
private static final Schema SCHEMA = new Schema(
optional(1, "c1", Types.IntegerType.get()),
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get())
);
@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
public static Object[] parameters() {
return new Object[] { "true", "false" };
}
@Rule
public TemporaryFolder temp = new TemporaryFolder();
private final String snapshotIdInheritanceEnabled;
private String tableLocation = null;
public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) {
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
}
@Before
public void setupTableLocation() throws Exception {
File tableDir = temp.newFolder();
this.tableLocation = tableDir.toURI().toString();
}
@Test
public void testRewriteManifestsEmptyTable() throws IOException {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
Assert.assertNull("Table must be empty", table.currentSnapshot());
Actions actions = Actions.forTable(table);
actions.rewriteManifests()
.rewriteIf(manifest -> true)
.stagingLocation(temp.newFolder().toString())
.execute();
Assert.assertNull("Table must stay empty", table.currentSnapshot());
}
@Test
public void testRewriteSmallManifestsNonPartitionedTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
List<ThreeColumnRecord> records1 = Lists.newArrayList(
new ThreeColumnRecord(1, null, "AAAA"),
new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
);
writeRecords(records1);
List<ThreeColumnRecord> records2 = Lists.newArrayList(
new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
);
writeRecords(records2);
table.refresh();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
Actions actions = Actions.forTable(table);
RewriteManifestsActionResult result = actions.rewriteManifests()
.rewriteIf(manifest -> true)
.execute();
Assert.assertEquals("Action should rewrite 2 manifests", 2, result.deletedManifests().size());
Assert.assertEquals("Action should add 1 manifests", 1, result.addedManifests().size());
table.refresh();
List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
Assert.assertFalse(newManifests.get(0).hasAddedFiles());
Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
expectedRecords.addAll(records2);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
.as(Encoders.bean(ThreeColumnRecord.class))
.collectAsList();
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}
@Test
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
.identity("c1")
.truncate("c2", 2)
.build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
List<ThreeColumnRecord> records1 = Lists.newArrayList(
new ThreeColumnRecord(1, null, "AAAA"),
new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
);
writeRecords(records1);
List<ThreeColumnRecord> records2 = Lists.newArrayList(
new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
);
writeRecords(records2);
List<ThreeColumnRecord> records3 = Lists.newArrayList(
new ThreeColumnRecord(3, "EEEEEEEEEE", "EEEE"),
new ThreeColumnRecord(3, "FFFFFFFFFF", "FFFF")
);
writeRecords(records3);
List<ThreeColumnRecord> records4 = Lists.newArrayList(
new ThreeColumnRecord(4, "GGGGGGGGGG", "GGGG"),
new ThreeColumnRecord(4, "HHHHHHHHHG", "HHHH")
);
writeRecords(records4);
table.refresh();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 4 manifests before rewrite", 4, manifests.size());
Actions actions = Actions.forTable(table);
// we will expect to have 2 manifests with 4 entries in each after rewrite
long manifestEntrySizeBytes = computeManifestEntrySizeBytes(manifests);
long targetManifestSizeBytes = (long) (1.05 * 4 * manifestEntrySizeBytes);
table.updateProperties()
.set(TableProperties.MANIFEST_TARGET_SIZE_BYTES, String.valueOf(targetManifestSizeBytes))
.commit();
RewriteManifestsActionResult result = actions.rewriteManifests()
.rewriteIf(manifest -> true)
.execute();
Assert.assertEquals("Action should rewrite 4 manifests", 4, result.deletedManifests().size());
Assert.assertEquals("Action should add 2 manifests", 2, result.addedManifests().size());
table.refresh();
List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
Assert.assertFalse(newManifests.get(0).hasAddedFiles());
Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
Assert.assertEquals(4, (long) newManifests.get(1).existingFilesCount());
Assert.assertFalse(newManifests.get(1).hasAddedFiles());
Assert.assertFalse(newManifests.get(1).hasDeletedFiles());
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
expectedRecords.addAll(records2);
expectedRecords.addAll(records3);
expectedRecords.addAll(records4);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
.as(Encoders.bean(ThreeColumnRecord.class))
.collectAsList();
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}
@Test
public void testRewriteImportedManifests() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
.identity("c3")
.build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
List<ThreeColumnRecord> records = Lists.newArrayList(
new ThreeColumnRecord(1, null, "AAAA"),
new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
);
File parquetTableDir = temp.newFolder("parquet_table");
String parquetTableLocation = parquetTableDir.toURI().toString();
try {
Dataset<Row> inputDF = spark.createDataFrame(records, ThreeColumnRecord.class);
inputDF.select("c1", "c2", "c3")
.write()
.format("parquet")
.mode("overwrite")
.option("path", parquetTableLocation)
.partitionBy("c3")
.saveAsTable("parquet_table");
File stagingDir = temp.newFolder("staging-dir");
SparkTableUtil.importSparkTable(spark, new TableIdentifier("parquet_table"), table, stagingDir.toString());
Snapshot snapshot = table.currentSnapshot();
Actions actions = Actions.forTable(table);
RewriteManifestsActionResult result = actions.rewriteManifests()
.rewriteIf(manifest -> true)
.stagingLocation(temp.newFolder().toString())
.execute();
Assert.assertEquals("Action should rewrite all manifests", snapshot.allManifests(), result.deletedManifests());
Assert.assertEquals("Action should add 1 manifest", 1, result.addedManifests().size());
} finally {
spark.sql("DROP TABLE parquet_table");
}
}
@Test
public void testRewriteLargeManifestsPartitionedTable() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
.identity("c3")
.build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
// all records belong to the same partition
List<ThreeColumnRecord> records = Lists.newArrayList();
for (int i = 0; i < 50; i++) {
records.add(new ThreeColumnRecord(i, String.valueOf(i), "0"));
}
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
// repartition to create separate files
writeDF(df.repartition(50, df.col("c1")));
table.refresh();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 1 manifests before rewrite", 1, manifests.size());
// set the target manifest size to a small value to force splitting records into multiple files
table.updateProperties()
.set(TableProperties.MANIFEST_TARGET_SIZE_BYTES, String.valueOf(manifests.get(0).length() / 2))
.commit();
Actions actions = Actions.forTable(table);
RewriteManifestsActionResult result = actions.rewriteManifests()
.rewriteIf(manifest -> true)
.stagingLocation(temp.newFolder().toString())
.execute();
Assert.assertEquals("Action should rewrite 1 manifest", 1, result.deletedManifests().size());
Assert.assertEquals("Action should add 2 manifests", 2, result.addedManifests().size());
table.refresh();
List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
.as(Encoders.bean(ThreeColumnRecord.class))
.collectAsList();
Assert.assertEquals("Rows must match", records, actualRecords);
}
@Test
public void testRewriteManifestsWithPredicate() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
.identity("c1")
.truncate("c2", 2)
.build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
List<ThreeColumnRecord> records1 = Lists.newArrayList(
new ThreeColumnRecord(1, null, "AAAA"),
new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
);
writeRecords(records1);
List<ThreeColumnRecord> records2 = Lists.newArrayList(
new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
);
writeRecords(records2);
table.refresh();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
Actions actions = Actions.forTable(table);
// rewrite only the first manifest without caching
RewriteManifestsActionResult result = actions.rewriteManifests()
.rewriteIf(manifest -> manifest.path().equals(manifests.get(0).path()))
.stagingLocation(temp.newFolder().toString())
.useCaching(false)
.execute();
Assert.assertEquals("Action should rewrite 1 manifest", 1, result.deletedManifests().size());
Assert.assertEquals("Action should add 1 manifests", 1, result.addedManifests().size());
table.refresh();
List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0)));
Assert.assertTrue("Second manifest must not be rewritten", newManifests.contains(manifests.get(1)));
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
expectedRecords.addAll(records2);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
.as(Encoders.bean(ThreeColumnRecord.class))
.collectAsList();
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}
private void writeRecords(List<ThreeColumnRecord> records) {
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
writeDF(df);
}
private void writeDF(Dataset<Row> df) {
df.select("c1", "c2", "c3")
.write()
.format("iceberg")
.mode("append")
.save(tableLocation);
}
private long computeManifestEntrySizeBytes(List<ManifestFile> manifests) {
long totalSize = 0L;
int numEntries = 0;
for (ManifestFile manifest : manifests) {
totalSize += manifest.length();
numEntries += manifest.addedFilesCount() + manifest.existingFilesCount() + manifest.deletedFilesCount();
}
return totalSize / numEntries;
}
}