blob: 56a78e048ee168f5b4b18341143a779aaa9276ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestRewriteManifests extends TableTestBase {
@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] { 1, 2 };
}
public TestRewriteManifests(int formatVersion) {
super(formatVersion);
}
@Test
public void testRewriteManifestsAppendedDirectly() throws IOException {
Table table = load();
table.updateProperties().set(SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit();
ManifestFile newManifest = writeManifest(
"manifest-file-1.avro",
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));
table.newFastAppend()
.appendManifest(newManifest)
.commit();
long appendId = table.currentSnapshot().snapshotId();
Assert.assertEquals(1, table.currentSnapshot().allManifests().size());
table.rewriteManifests()
.clusterBy(file -> "")
.commit();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals(1, manifests.size());
validateManifestEntries(manifests.get(0),
ids(appendId),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
}
@Test
public void testRewriteManifestsGeneratedAndAppendedDirectly() throws IOException {
Table table = load();
table.updateProperties().set(SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit();
ManifestFile newManifest = writeManifest(
"manifest-file-1.avro",
manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));
table.newFastAppend()
.appendManifest(newManifest)
.commit();
long manifestAppendId = table.currentSnapshot().snapshotId();
table.newFastAppend()
.appendFile(FILE_B)
.commit();
long fileAppendId = table.currentSnapshot().snapshotId();
Assert.assertEquals(2, table.currentSnapshot().allManifests().size());
table.rewriteManifests()
.clusterBy(file -> "")
.commit();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Manifests must be merged into 1", 1, manifests.size());
// get the correct file order
List<DataFile> files;
List<Long> ids;
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
if (reader.iterator().next().path().equals(FILE_A.path())) {
files = Arrays.asList(FILE_A, FILE_B);
ids = Arrays.asList(manifestAppendId, fileAppendId);
} else {
files = Arrays.asList(FILE_B, FILE_A);
ids = Arrays.asList(fileAppendId, manifestAppendId);
}
}
validateManifestEntries(manifests.get(0),
ids.iterator(),
files.iterator(),
statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING));
}
@Test
public void testReplaceManifestsSeparate() {
Table table = load();
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
long appendId = table.currentSnapshot().snapshotId();
Assert.assertEquals(1, table.currentSnapshot().allManifests().size());
// cluster by path will split the manifest into two
table.rewriteManifests()
.clusterBy(file -> file.path())
.commit();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals(2, manifests.size());
manifests.sort(Comparator.comparing(ManifestFile::path));
validateManifestEntries(manifests.get(0),
ids(appendId),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(manifests.get(1),
ids(appendId),
files(FILE_B),
statuses(ManifestEntry.Status.EXISTING));
}
@Test
public void testReplaceManifestsConsolidate() throws IOException {
Table table = load();
table.newFastAppend()
.appendFile(FILE_A)
.commit();
long appendIdA = table.currentSnapshot().snapshotId();
table.newFastAppend()
.appendFile(FILE_B)
.commit();
long appendIdB = table.currentSnapshot().snapshotId();
Assert.assertEquals(2, table.currentSnapshot().allManifests().size());
// cluster by constant will combine manifests into one
table.rewriteManifests()
.clusterBy(file -> "file")
.commit();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals(1, manifests.size());
// get the file order correct
List<DataFile> files;
List<Long> ids;
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
if (reader.iterator().next().path().equals(FILE_A.path())) {
files = Arrays.asList(FILE_A, FILE_B);
ids = Arrays.asList(appendIdA, appendIdB);
} else {
files = Arrays.asList(FILE_B, FILE_A);
ids = Arrays.asList(appendIdB, appendIdA);
}
}
validateManifestEntries(manifests.get(0),
ids.iterator(),
files.iterator(),
statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING));
}
@Test
public void testReplaceManifestsWithFilter() throws IOException {
Table table = load();
table.newFastAppend()
.appendFile(FILE_A)
.commit();
long appendIdA = table.currentSnapshot().snapshotId();
table.newFastAppend()
.appendFile(FILE_B)
.commit();
long appendIdB = table.currentSnapshot().snapshotId();
table.newFastAppend()
.appendFile(FILE_C)
.commit();
long appendIdC = table.currentSnapshot().snapshotId();
Assert.assertEquals(3, table.currentSnapshot().allManifests().size());
// keep the file A manifest, combine the other two
table.rewriteManifests()
.clusterBy(file -> "file")
.rewriteIf(manifest -> {
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) {
return !reader.iterator().next().path().equals(FILE_A.path());
} catch (IOException x) {
throw new RuntimeIOException(x);
}
})
.commit();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals(2, manifests.size());
// get the file order correct
List<DataFile> files;
List<Long> ids;
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
if (reader.iterator().next().path().equals(FILE_B.path())) {
files = Arrays.asList(FILE_B, FILE_C);
ids = Arrays.asList(appendIdB, appendIdC);
} else {
files = Arrays.asList(FILE_C, FILE_B);
ids = Arrays.asList(appendIdC, appendIdB);
}
}
validateManifestEntries(manifests.get(0),
ids.iterator(),
files.iterator(),
statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING));
validateManifestEntries(manifests.get(1),
ids(appendIdA),
files(FILE_A),
statuses(ManifestEntry.Status.ADDED));
}
@Test
public void testReplaceManifestsMaxSize() {
Table table = load();
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
long appendId = table.currentSnapshot().snapshotId();
Assert.assertEquals(1, table.currentSnapshot().allManifests().size());
// cluster by constant will combine manifests into one but small target size will create one per entry
BaseRewriteManifests rewriteManifests = spy((BaseRewriteManifests) table.rewriteManifests());
when(rewriteManifests.getManifestTargetSizeBytes()).thenReturn(1L);
rewriteManifests.clusterBy(file -> "file").commit();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals(2, manifests.size());
manifests.sort(Comparator.comparing(ManifestFile::path));
validateManifestEntries(manifests.get(0),
ids(appendId),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(manifests.get(1),
ids(appendId),
files(FILE_B),
statuses(ManifestEntry.Status.EXISTING));
}
@Test
public void testConcurrentRewriteManifest() throws IOException {
Table table = load();
table.newFastAppend()
.appendFile(FILE_A)
.commit();
long appendIdA = table.currentSnapshot().snapshotId();
table.newFastAppend()
.appendFile(FILE_B)
.commit();
long appendIdB = table.currentSnapshot().snapshotId();
// start a rewrite manifests that involves both manifests
RewriteManifests rewrite = table.rewriteManifests();
rewrite.clusterBy(file -> "file").apply();
// commit a rewrite manifests that only involves one manifest
table.rewriteManifests()
.clusterBy(file -> "file")
.rewriteIf(manifest -> {
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) {
return !reader.iterator().next().path().equals(FILE_A.path());
} catch (IOException x) {
throw new RuntimeIOException(x);
}
})
.commit();
Assert.assertEquals(2, table.currentSnapshot().allManifests().size());
// commit the rewrite manifests in progress - this should perform a full rewrite as the manifest
// with file B is no longer part of the snapshot
rewrite.commit();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals(1, manifests.size());
// get the file order correct
List<DataFile> files;
List<Long> ids;
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
if (reader.iterator().next().path().equals(FILE_A.path())) {
files = Arrays.asList(FILE_A, FILE_B);
ids = Arrays.asList(appendIdA, appendIdB);
} else {
files = Arrays.asList(FILE_B, FILE_A);
ids = Arrays.asList(appendIdB, appendIdA);
}
}
validateManifestEntries(manifests.get(0),
ids.iterator(),
files.iterator(),
statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING));
}
@Test
public void testAppendDuringRewriteManifest() {
Table table = load();
table.newFastAppend()
.appendFile(FILE_A)
.commit();
long appendIdA = table.currentSnapshot().snapshotId();
// start the rewrite manifests
RewriteManifests rewrite = table.rewriteManifests();
rewrite.clusterBy(file -> "file").apply();
// append a file
table.newFastAppend()
.appendFile(FILE_B)
.commit();
long appendIdB = table.currentSnapshot().snapshotId();
Assert.assertEquals(2, table.currentSnapshot().allManifests().size());
// commit the rewrite manifests in progress
rewrite.commit();
// the rewrite should only affect the first manifest, so we will end up with 2 manifests even though we
// have a single cluster key, rewritten one should be the first in the list
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals(2, manifests.size());
validateManifestEntries(manifests.get(0),
ids(appendIdA),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(manifests.get(1),
ids(appendIdB),
files(FILE_B),
statuses(ManifestEntry.Status.ADDED));
}
@Test
public void testRewriteManifestDuringAppend() {
Table table = load();
table.newFastAppend()
.appendFile(FILE_A)
.commit();
long appendIdA = table.currentSnapshot().snapshotId();
// start an append
AppendFiles append = table.newFastAppend();
append.appendFile(FILE_B).apply();
// rewrite the manifests - only affects the first
table.rewriteManifests()
.clusterBy(file -> "file")
.commit();
Assert.assertEquals(1, table.currentSnapshot().allManifests().size());
// commit the append in progress
append.commit();
long appendIdB = table.currentSnapshot().snapshotId();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals(2, manifests.size());
// last append should be the first in the list
validateManifestEntries(manifests.get(0),
ids(appendIdB),
files(FILE_B),
statuses(ManifestEntry.Status.ADDED));
validateManifestEntries(manifests.get(1),
ids(appendIdA),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
}
@Test
public void testBasicManifestReplacement() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests();
Assert.assertEquals(1, firstSnapshotManifests.size());
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
table.newFastAppend()
.appendFile(FILE_C)
.appendFile(FILE_D)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
ManifestFile firstNewManifest = writeManifest(
"manifest-file-1.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest = writeManifest(
"manifest-file-2.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
rewriteManifests.deleteManifest(firstSnapshotManifest);
rewriteManifests.addManifest(firstNewManifest);
rewriteManifests.addManifest(secondNewManifest);
rewriteManifests.commit();
Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests();
Assert.assertEquals(3, manifests.size());
validateSummary(snapshot, 1, 1, 2, 0);
validateManifestEntries(
manifests.get(0),
ids(firstSnapshot.snapshotId()),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(1),
ids(firstSnapshot.snapshotId()),
files(FILE_B),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(2),
ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()),
files(FILE_C, FILE_D),
statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
}
@Test
public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.updateProperties()
.set(SNAPSHOT_ID_INHERITANCE_ENABLED, "true")
.commit();
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests();
Assert.assertEquals(1, firstSnapshotManifests.size());
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
table.newFastAppend()
.appendFile(FILE_C)
.appendFile(FILE_D)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
ManifestFile firstNewManifest = writeManifest(
"manifest-file-1.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest = writeManifest(
"manifest-file-2.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
rewriteManifests.deleteManifest(firstSnapshotManifest);
rewriteManifests.addManifest(firstNewManifest);
rewriteManifests.addManifest(secondNewManifest);
rewriteManifests.commit();
Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests();
Assert.assertEquals(3, manifests.size());
validateSummary(snapshot, 1, 1, 2, 0);
validateManifestEntries(
manifests.get(0),
ids(firstSnapshot.snapshotId()),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(1),
ids(firstSnapshot.snapshotId()),
files(FILE_B),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(2),
ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()),
files(FILE_C, FILE_D),
statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
// validate that any subsequent operation does not fail
table.newDelete()
.deleteFromRowFilter(Expressions.alwaysTrue())
.commit();
}
@Test
public void testWithMultiplePartitionSpec() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
TableMetadata base = readMetadata();
Assert.assertEquals("Should create 1 manifest for initial write",
1, base.currentSnapshot().allManifests().size());
ManifestFile initialManifest = base.currentSnapshot().allManifests().get(0);
int initialPartitionSpecId = initialManifest.partitionSpecId();
// build the new spec using the table's schema, which uses fresh IDs
PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
.bucket("data", 16)
.bucket("id", 4)
.build();
// commit the new partition spec to the table manually
table.ops().commit(base, base.updatePartitionSpec(newSpec));
DataFile newFileY = DataFiles.builder(newSpec)
.withPath("/path/to/data-y.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=3")
.withRecordCount(1)
.build();
table.newAppend()
.appendFile(newFileY)
.commit();
DataFile newFileZ = DataFiles.builder(newSpec)
.withPath("/path/to/data-z.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=4")
.withRecordCount(1)
.build();
table.newAppend()
.appendFile(newFileZ)
.commit();
Assert.assertEquals("Should use 3 manifest files",
3, table.currentSnapshot().allManifests().size());
RewriteManifests rewriteManifests = table.rewriteManifests();
// try to cluster in 1 manifest file, but because of 2 partition specs
// we should still have 2 manifest files.
rewriteManifests.clusterBy(dataFile -> "file").commit();
List<ManifestFile> manifestFiles = table.currentSnapshot().allManifests();
Assert.assertEquals("Rewrite manifest should produce 2 manifest files",
2, manifestFiles.size());
Assert.assertEquals("2 manifest files should have different partitionSpecId",
true, manifestFiles.get(0).partitionSpecId() != manifestFiles.get(1).partitionSpecId());
matchNumberOfManifestFileWithSpecId(manifestFiles, initialPartitionSpecId, 1);
matchNumberOfManifestFileWithSpecId(manifestFiles, table.ops().current().spec().specId(), 1);
Assert.assertEquals("first manifest file should have 2 data files",
Integer.valueOf(2), manifestFiles.get(0).existingFilesCount());
Assert.assertEquals("second manifest file should have 2 data files",
Integer.valueOf(2), manifestFiles.get(1).existingFilesCount());
}
@Test
public void testManifestSizeWithMultiplePartitionSpec() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
TableMetadata base = readMetadata();
Assert.assertEquals("Should create 1 manifest for initial write",
1, base.currentSnapshot().allManifests().size());
ManifestFile initialManifest = base.currentSnapshot().allManifests().get(0);
int initialPartitionSpecId = initialManifest.partitionSpecId();
// build the new spec using the table's schema, which uses fresh IDs
PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
.bucket("data", 16)
.bucket("id", 4)
.build();
// commit the new partition spec to the table manually
table.ops().commit(base, base.updatePartitionSpec(newSpec));
DataFile newFileY = DataFiles.builder(newSpec)
.withPath("/path/to/data-y.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=3")
.withRecordCount(1)
.build();
table.newAppend()
.appendFile(newFileY)
.commit();
DataFile newFileZ = DataFiles.builder(newSpec)
.withPath("/path/to/data-z.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=4")
.withRecordCount(1)
.build();
table.newAppend()
.appendFile(newFileZ)
.commit();
Assert.assertEquals("Rewrite manifests should produce 3 manifest files",
3, table.currentSnapshot().allManifests().size());
// cluster by constant will combine manifests into one but small target size will create one per entry
BaseRewriteManifests rewriteManifests = spy((BaseRewriteManifests) table.rewriteManifests());
when(rewriteManifests.getManifestTargetSizeBytes()).thenReturn(1L);
// rewriteManifests should produce 4 manifestFiles, because of targetByteSize=1
rewriteManifests.clusterBy(dataFile -> "file").commit();
List<ManifestFile> manifestFiles = table.currentSnapshot().allManifests();
Assert.assertEquals("Should use 4 manifest files",
4, manifestFiles.size());
matchNumberOfManifestFileWithSpecId(manifestFiles, initialPartitionSpecId, 2);
matchNumberOfManifestFileWithSpecId(manifestFiles, table.ops().current().spec().specId(), 2);
Assert.assertEquals("first manifest file should have 1 data files",
Integer.valueOf(1), manifestFiles.get(0).existingFilesCount());
Assert.assertEquals("second manifest file should have 1 data files",
Integer.valueOf(1), manifestFiles.get(1).existingFilesCount());
Assert.assertEquals("third manifest file should have 1 data files",
Integer.valueOf(1), manifestFiles.get(2).existingFilesCount());
Assert.assertEquals("fourth manifest file should have 1 data files",
Integer.valueOf(1), manifestFiles.get(3).existingFilesCount());
}
@Test
public void testManifestReplacementConcurrentAppend() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests();
Assert.assertEquals(1, firstSnapshotManifests.size());
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
ManifestFile firstNewManifest = writeManifest(
"manifest-file-1.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest = writeManifest(
"manifest-file-2.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
rewriteManifests.deleteManifest(firstSnapshotManifest);
rewriteManifests.addManifest(firstNewManifest);
rewriteManifests.addManifest(secondNewManifest);
table.newFastAppend()
.appendFile(FILE_C)
.appendFile(FILE_D)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
Assert.assertEquals(2, table.currentSnapshot().allManifests().size());
rewriteManifests.commit();
Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests();
Assert.assertEquals(3, manifests.size());
validateSummary(snapshot, 1, 1, 2, 0);
validateManifestEntries(
manifests.get(0),
ids(firstSnapshot.snapshotId()),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(1),
ids(firstSnapshot.snapshotId()),
files(FILE_B),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(2),
ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()),
files(FILE_C, FILE_D),
statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
}
@Test
public void testManifestReplacementConcurrentDelete() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.updateProperties()
.set(MANIFEST_MERGE_ENABLED, "false")
.commit();
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests();
Assert.assertEquals(1, firstSnapshotManifests.size());
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
table.newFastAppend()
.appendFile(FILE_C)
.appendFile(FILE_D)
.commit();
long secondSnapshotId = table.currentSnapshot().snapshotId();
ManifestFile firstNewManifest = writeManifest(
"manifest-file-1.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest = writeManifest(
"manifest-file-2.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
rewriteManifests.deleteManifest(firstSnapshotManifest);
rewriteManifests.addManifest(firstNewManifest);
rewriteManifests.addManifest(secondNewManifest);
table.newDelete()
.deleteFile(FILE_C)
.commit();
long thirdSnapshotId = table.currentSnapshot().snapshotId();
rewriteManifests.commit();
Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests();
Assert.assertEquals(3, manifests.size());
validateSummary(snapshot, 1, 1, 2, 0);
validateManifestEntries(
manifests.get(0),
ids(firstSnapshot.snapshotId()),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(1),
ids(firstSnapshot.snapshotId()),
files(FILE_B),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(2),
ids(thirdSnapshotId, secondSnapshotId),
files(FILE_C, FILE_D),
statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
}
@Test
public void testManifestReplacementConcurrentConflictingDelete() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.newFastAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests();
Assert.assertEquals(1, firstSnapshotManifests.size());
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
ManifestFile firstNewManifest = writeManifest(
"manifest-file-1.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
ManifestFile secondNewManifest = writeManifest(
"manifest-file-2.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
RewriteManifests rewriteManifests = table.rewriteManifests();
rewriteManifests.deleteManifest(firstSnapshotManifest);
rewriteManifests.addManifest(firstNewManifest);
rewriteManifests.addManifest(secondNewManifest);
table.newDelete()
.deleteFile(FILE_A)
.commit();
AssertHelpers.assertThrows("Should reject commit",
ValidationException.class, "Manifest is missing",
rewriteManifests::commit);
}
@Test
public void testManifestReplacementCombinedWithRewrite() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.newFastAppend()
.appendFile(FILE_A)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests();
Assert.assertEquals(1, firstSnapshotManifests.size());
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
table.newFastAppend()
.appendFile(FILE_B)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
table.newFastAppend()
.appendFile(FILE_C)
.commit();
table.newFastAppend()
.appendFile(FILE_D)
.commit();
Assert.assertEquals(4, Iterables.size(table.snapshots()));
ManifestFile newManifest = writeManifest(
"manifest-file-1.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
table.rewriteManifests()
.deleteManifest(firstSnapshotManifest)
.addManifest(newManifest)
.clusterBy(dataFile -> "const-value")
.rewriteIf(manifest -> {
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) {
return !reader.iterator().next().path().equals(FILE_B.path());
} catch (IOException x) {
throw new RuntimeIOException(x);
}
})
.commit();
Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests();
Assert.assertEquals(3, manifests.size());
validateSummary(snapshot, 3, 1, 2, 2);
validateManifestEntries(
manifests.get(1),
ids(firstSnapshot.snapshotId()),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(2),
ids(secondSnapshot.snapshotId()),
files(FILE_B),
statuses(ManifestEntry.Status.ADDED));
}
@Test
public void testManifestReplacementCombinedWithRewriteConcurrentDelete() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.updateProperties()
.set(MANIFEST_MERGE_ENABLED, "false")
.commit();
table.newFastAppend()
.appendFile(FILE_A)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests();
Assert.assertEquals(1, firstSnapshotManifests.size());
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
table.newFastAppend()
.appendFile(FILE_B)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
table.newFastAppend()
.appendFile(FILE_C)
.commit();
Assert.assertEquals(3, Iterables.size(table.snapshots()));
ManifestEntry<DataFile> entry = manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A);
// update the entry's sequence number or else it will be rejected by the writer
entry.setSequenceNumber(firstSnapshot.sequenceNumber());
ManifestFile newManifest = writeManifest("manifest-file-1.avro", entry);
RewriteManifests rewriteManifests = table.rewriteManifests()
.deleteManifest(firstSnapshotManifest)
.addManifest(newManifest)
.clusterBy(dataFile -> "const-value");
rewriteManifests.apply();
table.newDelete()
.deleteFile(FILE_C)
.commit();
rewriteManifests.commit();
Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests();
Assert.assertEquals(2, manifests.size());
validateSummary(snapshot, 3, 0, 2, 1);
validateManifestEntries(
manifests.get(0),
ids(secondSnapshot.snapshotId()),
files(FILE_B),
statuses(ManifestEntry.Status.EXISTING));
validateManifestEntries(
manifests.get(1),
ids(firstSnapshot.snapshotId()),
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
}
@Test
public void testInvalidUsage() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.newFastAppend()
.appendFile(FILE_A)
.commit();
Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests();
Assert.assertEquals(1, manifests.size());
ManifestFile manifest = manifests.get(0);
ManifestEntry<DataFile> appendEntry = manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A);
// update the entry's sequence number or else it will be rejected by the writer
appendEntry.setSequenceNumber(snapshot.sequenceNumber());
ManifestFile invalidAddedFileManifest = writeManifest("manifest-file-2.avro", appendEntry);
AssertHelpers.assertThrows("Should reject commit",
IllegalArgumentException.class, "Cannot add manifest with added files",
() -> table.rewriteManifests()
.deleteManifest(manifest)
.addManifest(invalidAddedFileManifest)
.commit());
ManifestEntry<DataFile> deleteEntry = manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A);
// update the entry's sequence number or else it will be rejected by the writer
deleteEntry.setSequenceNumber(snapshot.sequenceNumber());
ManifestFile invalidDeletedFileManifest = writeManifest("manifest-file-3.avro", deleteEntry);
AssertHelpers.assertThrows("Should reject commit",
IllegalArgumentException.class, "Cannot add manifest with deleted files",
() -> table.rewriteManifests()
.deleteManifest(manifest)
.addManifest(invalidDeletedFileManifest)
.commit());
AssertHelpers.assertThrows("Should reject commit",
ValidationException.class, "must have the same number of active files",
() -> table.rewriteManifests()
.deleteManifest(manifest)
.commit());
}
@Test
public void testManifestReplacementFailure() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.newFastAppend()
.appendFile(FILE_A)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests();
Assert.assertEquals(1, firstSnapshotManifests.size());
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
table.newFastAppend()
.appendFile(FILE_B)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
List<ManifestFile> secondSnapshotManifests = secondSnapshot.allManifests();
Assert.assertEquals(2, secondSnapshotManifests.size());
ManifestFile secondSnapshotManifest = secondSnapshotManifests.get(0);
ManifestFile newManifest = writeManifest(
"manifest-file.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A),
manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshot.snapshotId(), FILE_B));
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.commit();
table.ops().failCommits(5);
RewriteManifests rewriteManifests = table.rewriteManifests();
rewriteManifests.deleteManifest(firstSnapshotManifest);
rewriteManifests.deleteManifest(secondSnapshotManifest);
rewriteManifests.addManifest(newManifest);
AssertHelpers.assertThrows("Should reject commit",
CommitFailedException.class, "Injected failure",
rewriteManifests::commit);
Assert.assertTrue("New manifest should not be deleted", new File(newManifest.path()).exists());
}
@Test
public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOException {
Assert.assertNull("Table should be empty", table.currentSnapshot());
table.updateProperties().set(SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit();
table.newFastAppend()
.appendFile(FILE_A)
.commit();
Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests();
Assert.assertEquals(1, firstSnapshotManifests.size());
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
table.newFastAppend()
.appendFile(FILE_B)
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
List<ManifestFile> secondSnapshotManifests = secondSnapshot.allManifests();
Assert.assertEquals(2, secondSnapshotManifests.size());
ManifestFile secondSnapshotManifest = secondSnapshotManifests.get(0);
ManifestFile newManifest = writeManifest(
"manifest-file.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A),
manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshot.snapshotId(), FILE_B));
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.commit();
table.ops().failCommits(5);
RewriteManifests rewriteManifests = table.rewriteManifests();
rewriteManifests.deleteManifest(firstSnapshotManifest);
rewriteManifests.deleteManifest(secondSnapshotManifest);
rewriteManifests.addManifest(newManifest);
AssertHelpers.assertThrows("Should reject commit",
CommitFailedException.class, "Injected failure",
rewriteManifests::commit);
Assert.assertTrue("New manifest should not be deleted", new File(newManifest.path()).exists());
}
private void validateSummary(Snapshot snapshot, int replaced, int kept, int created, int entryCount) {
Map<String, String> summary = snapshot.summary();
Assert.assertEquals(
"Replaced manifest count should match",
replaced, Integer.parseInt(summary.get("manifests-replaced")));
Assert.assertEquals(
"Kept manifest count should match",
kept, Integer.parseInt(summary.get("manifests-kept")));
Assert.assertEquals(
"Created manifest count should match",
created, Integer.parseInt(summary.get("manifests-created")));
Assert.assertEquals(
"Entry count should match",
entryCount, Integer.parseInt(summary.get("entries-processed")));
}
private void matchNumberOfManifestFileWithSpecId(List<ManifestFile> manifestFiles, int toBeMatchedPartitionSpecId,
int numberOfManifestWithPartitionSpecID) {
long matchedManifestsCounter = manifestFiles.stream()
.filter(m -> m.partitionSpecId() == toBeMatchedPartitionSpecId).count();
Assert.assertEquals("manifest list should have " + numberOfManifestWithPartitionSpecID +
" manifests matching this partitionSpecId " + toBeMatchedPartitionSpecId,
numberOfManifestWithPartitionSpecID, matchedManifestsCounter);
}
}