blob: 08b5cf500362df571c614d2500e26d809e83145b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestTransaction extends TableTestBase {
@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] { 1, 2 };
}
public TestTransaction(int formatVersion) {
super(formatVersion);
}
@Test
public void testEmptyTransaction() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
txn.commitTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0", 0, (int) version());
}
@Test
public void testSingleOperationTransaction() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
txn.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Assert.assertSame("Base metadata should not change when an append is committed",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after append", 0, (int) version());
txn.commitTransaction();
validateSnapshot(base.currentSnapshot(), readMetadata().currentSnapshot(), FILE_A, FILE_B);
Assert.assertEquals("Table should be on version 1 after commit", 1, (int) version());
}
@Test
public void testMultipleOperationTransaction() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
txn.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
Snapshot appendSnapshot = txn.table().currentSnapshot();
txn.newDelete()
.deleteFile(FILE_A)
.commit();
Snapshot deleteSnapshot = txn.table().currentSnapshot();
Assert.assertSame("Base metadata should not change when an append is committed",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after append", 0, (int) version());
txn.commitTransaction();
Assert.assertEquals("Table should be on version 1 after commit", 1, (int) version());
Assert.assertEquals("Table should have one manifest after commit",
1, readMetadata().currentSnapshot().allManifests().size());
Assert.assertEquals("Table snapshot should be the delete snapshot",
deleteSnapshot, readMetadata().currentSnapshot());
validateManifestEntries(readMetadata().currentSnapshot().allManifests().get(0),
ids(deleteSnapshot.snapshotId(), appendSnapshot.snapshotId()),
files(FILE_A, FILE_B), statuses(Status.DELETED, Status.EXISTING));
Assert.assertEquals("Table should have a snapshot for each operation",
2, readMetadata().snapshots().size());
validateManifestEntries(readMetadata().snapshots().get(0).allManifests().get(0),
ids(appendSnapshot.snapshotId(), appendSnapshot.snapshotId()),
files(FILE_A, FILE_B), statuses(Status.ADDED, Status.ADDED));
}
@Test
public void testMultipleOperationTransactionFromTable() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
txn.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
Snapshot appendSnapshot = txn.table().currentSnapshot();
txn.table().newDelete()
.deleteFile(FILE_A)
.commit();
Snapshot deleteSnapshot = txn.table().currentSnapshot();
Assert.assertSame("Base metadata should not change when an append is committed",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after append", 0, (int) version());
txn.commitTransaction();
Assert.assertEquals("Table should be on version 1 after commit", 1, (int) version());
Assert.assertEquals("Table should have one manifest after commit",
1, readMetadata().currentSnapshot().allManifests().size());
Assert.assertEquals("Table snapshot should be the delete snapshot",
deleteSnapshot, readMetadata().currentSnapshot());
validateManifestEntries(readMetadata().currentSnapshot().allManifests().get(0),
ids(deleteSnapshot.snapshotId(), appendSnapshot.snapshotId()),
files(FILE_A, FILE_B), statuses(Status.DELETED, Status.EXISTING));
Assert.assertEquals("Table should have a snapshot for each operation",
2, readMetadata().snapshots().size());
validateManifestEntries(readMetadata().snapshots().get(0).allManifests().get(0),
ids(appendSnapshot.snapshotId(), appendSnapshot.snapshotId()),
files(FILE_A, FILE_B), statuses(Status.ADDED, Status.ADDED));
}
@Test
public void testDetectsUncommittedChange() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
txn.newAppend().appendFile(FILE_A).appendFile(FILE_B); // not committed
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
AssertHelpers.assertThrows("Should reject commit when last operation has not committed",
IllegalStateException.class,
"Cannot create new DeleteFiles: last operation has not committed",
txn::newDelete);
}
@Test
public void testDetectsUncommittedChangeOnCommit() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
txn.newAppend().appendFile(FILE_A).appendFile(FILE_B); // not committed
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
AssertHelpers.assertThrows("Should reject commit when last operation has not committed",
IllegalStateException.class,
"Cannot commit transaction: last operation has not committed",
txn::commitTransaction);
}
@Test
public void testTransactionConflict() {
// set retries to 0 to catch the failure
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "0")
.commit();
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
txn.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
table.ops().failCommits(1);
AssertHelpers.assertThrows("Transaction commit should fail",
CommitFailedException.class, "Injected failure", txn::commitTransaction);
}
@Test
public void testTransactionRetry() {
// use only one retry
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.commit();
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
txn.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Set<ManifestFile> appendManifests = Sets.newHashSet(txn.table().currentSnapshot().allManifests());
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
table.ops().failCommits(1);
txn.commitTransaction();
Assert.assertEquals("Table should be on version 2 after commit", 2, (int) version());
Assert.assertEquals("Should reuse manifests from initial append commit",
appendManifests, Sets.newHashSet(table.currentSnapshot().allManifests()));
}
@Test
public void testTransactionRetryMergeAppend() {
// use only one retry
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.commit();
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
txn.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Set<ManifestFile> appendManifests = Sets.newHashSet(txn.table().currentSnapshot().allManifests());
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
table.newAppend()
.appendFile(FILE_C)
.appendFile(FILE_D)
.commit();
Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().allManifests());
txn.commitTransaction();
Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
Set<ManifestFile> expectedManifests = Sets.newHashSet();
expectedManifests.addAll(appendManifests);
expectedManifests.addAll(conflictAppendManifests);
Assert.assertEquals("Should reuse manifests from initial append commit and conflicting append",
expectedManifests, Sets.newHashSet(table.currentSnapshot().allManifests()));
}
@Test
public void testMultipleUpdateTransactionRetryMergeCleanup() {
// use only one retry and aggressively merge manifests
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.commit();
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
txn.updateProperties()
.set("test-property", "test-value")
.commit();
txn.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Assert.assertEquals("Append should create one manifest",
1, txn.table().currentSnapshot().allManifests().size());
ManifestFile appendManifest = txn.table().currentSnapshot().allManifests().get(0);
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
table.newAppend()
.appendFile(FILE_C)
.appendFile(FILE_D)
.commit();
Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().allManifests());
txn.commitTransaction();
Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
Set<ManifestFile> previousManifests = Sets.newHashSet();
previousManifests.add(appendManifest);
previousManifests.addAll(conflictAppendManifests);
Assert.assertEquals("Should merge both commit manifests into a single manifest",
1, table.currentSnapshot().allManifests().size());
Assert.assertFalse("Should merge both commit manifests into a new manifest",
previousManifests.contains(table.currentSnapshot().allManifests().get(0)));
Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
}
@Test
public void testTransactionRetryMergeCleanup() {
// use only one retry and aggressively merge manifests
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.commit();
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
txn.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Assert.assertEquals("Append should create one manifest",
1, txn.table().currentSnapshot().allManifests().size());
ManifestFile appendManifest = txn.table().currentSnapshot().allManifests().get(0);
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
table.newAppend()
.appendFile(FILE_C)
.appendFile(FILE_D)
.commit();
Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().allManifests());
txn.commitTransaction();
Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
Set<ManifestFile> previousManifests = Sets.newHashSet();
previousManifests.add(appendManifest);
previousManifests.addAll(conflictAppendManifests);
Assert.assertEquals("Should merge both commit manifests into a single manifest",
1, table.currentSnapshot().allManifests().size());
Assert.assertFalse("Should merge both commit manifests into a new manifest",
previousManifests.contains(table.currentSnapshot().allManifests().get(0)));
Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
}
@Test
public void testTransactionRetryAndAppendManifests() throws Exception {
// use only one retry and aggressively merge manifests
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.commit();
Assert.assertEquals("Table should be on version 1", 1, (int) version());
table.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Assert.assertEquals("Table should be on version 2 after append", 2, (int) version());
Assert.assertEquals("Append should create one manifest", 1, table.currentSnapshot().allManifests().size());
ManifestFile v1manifest = table.currentSnapshot().allManifests().get(0);
TableMetadata base = readMetadata();
// create a manifest append
OutputFile manifestLocation = Files.localOutput("/tmp/" + UUID.randomUUID().toString() + ".avro");
ManifestWriter<DataFile> writer = ManifestFiles.write(table.spec(), manifestLocation);
try {
writer.add(FILE_D);
} finally {
writer.close();
}
Transaction txn = table.newTransaction();
txn.newAppend()
.appendManifest(writer.toManifestFile())
.commit();
Assert.assertSame("Base metadata should not change when commit is created", base, readMetadata());
Assert.assertEquals("Table should be on version 2 after txn create", 2, (int) version());
Assert.assertEquals("Append should have one merged manifest",
1, txn.table().currentSnapshot().allManifests().size());
ManifestFile mergedManifest = txn.table().currentSnapshot().allManifests().get(0);
// find the initial copy of the appended manifest
String copiedAppendManifest = Iterables.getOnlyElement(Iterables.filter(
Iterables.transform(listManifestFiles(), File::getPath),
path -> !v1manifest.path().contains(path) && !mergedManifest.path().contains(path)));
Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
Assert.assertTrue("Copied append manifest should not be deleted yet", new File(copiedAppendManifest).exists());
// cause the transaction commit to fail and retry
table.newAppend()
.appendFile(FILE_C)
.commit();
Assert.assertEquals("Table should be on version 3 after real append", 3, (int) version());
txn.commitTransaction();
Assert.assertEquals("Table should be on version 4 after commit", 4, (int) version());
Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
Assert.assertFalse("Append manifest should be deleted", new File(copiedAppendManifest).exists());
Assert.assertTrue("Transaction should hijack the delete of the first merged manifest",
((BaseTransaction) txn).deletedFiles().contains(mergedManifest.path()));
Assert.assertFalse("Append manifest should be deleted", new File(mergedManifest.path()).exists());
Assert.assertEquals("Should merge all commit manifests into a single manifest",
1, table.currentSnapshot().allManifests().size());
}
@Test
public void testTransactionRetryAndAppendManifestsWithSnapshotIdInheritance() throws Exception {
// use only one retry and aggressively merge manifests
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true")
.commit();
Assert.assertEquals("Table should be on version 1", 1, (int) version());
table.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();
Assert.assertEquals("Table should be on version 2 after append", 2, (int) version());
Assert.assertEquals("Append should create one manifest", 1, table.currentSnapshot().allManifests().size());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
ManifestFile appendManifest = writeManifestWithName("input.m0", FILE_D);
txn.newAppend()
.appendManifest(appendManifest)
.commit();
Assert.assertSame("Base metadata should not change when commit is created", base, readMetadata());
Assert.assertEquals("Table should be on version 2 after txn create", 2, (int) version());
Assert.assertEquals("Append should have one merged manifest",
1, txn.table().currentSnapshot().allManifests().size());
ManifestFile mergedManifest = txn.table().currentSnapshot().allManifests().get(0);
// cause the transaction commit to fail and retry
table.newAppend()
.appendFile(FILE_C)
.commit();
Assert.assertEquals("Table should be on version 3 after real append", 3, (int) version());
txn.commitTransaction();
Assert.assertEquals("Table should be on version 4 after commit", 4, (int) version());
Assert.assertTrue("Transaction should hijack the delete of the original append manifest",
((BaseTransaction) txn).deletedFiles().contains(appendManifest.path()));
Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
Assert.assertTrue("Transaction should hijack the delete of the first merged manifest",
((BaseTransaction) txn).deletedFiles().contains(mergedManifest.path()));
Assert.assertFalse("Merged append manifest should be deleted", new File(mergedManifest.path()).exists());
Assert.assertEquals("Should merge all commit manifests into a single manifest",
1, table.currentSnapshot().allManifests().size());
}
@Test
public void testTransactionNoCustomDeleteFunc() {
AssertHelpers.assertThrows("Should fail setting a custom delete function with a transaction",
IllegalArgumentException.class, "Cannot set delete callback more than once",
() -> table.newTransaction()
.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.deleteWith(file -> { }));
}
@Test
public void testTransactionFastAppends() {
table.updateProperties()
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.commit();
Transaction txn = table.newTransaction();
txn.newFastAppend()
.appendFile(FILE_A)
.commit();
txn.newFastAppend()
.appendFile(FILE_B)
.commit();
txn.commitTransaction();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Expected 2 manifests", 2, manifests.size());
}
@Test
public void testTransactionRewriteManifestsAppendedDirectly() throws IOException {
Table table = load();
table.updateProperties()
.set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.commit();
table.newFastAppend()
.appendFile(FILE_A)
.commit();
long firstSnapshotId = table.currentSnapshot().snapshotId();
table.newFastAppend()
.appendFile(FILE_B)
.commit();
long secondSnapshotId = table.currentSnapshot().snapshotId();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests after 2 appends", 2, manifests.size());
ManifestFile newManifest = writeManifest(
"manifest-file-1.avro",
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshotId, FILE_A),
manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshotId, FILE_B));
Transaction txn = table.newTransaction();
txn.rewriteManifests()
.deleteManifest(manifests.get(0))
.deleteManifest(manifests.get(1))
.addManifest(newManifest)
.commit();
txn.newAppend()
.appendFile(FILE_C)
.commit();
txn.commitTransaction();
long finalSnapshotId = table.currentSnapshot().snapshotId();
long finalSnapshotTimestamp = System.currentTimeMillis();
Assert.assertTrue("Append manifest should not be deleted", new File(newManifest.path()).exists());
List<ManifestFile> finalManifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 1 final manifest", 1, finalManifests.size());
validateManifestEntries(finalManifests.get(0),
ids(finalSnapshotId, firstSnapshotId, secondSnapshotId),
files(FILE_C, FILE_A, FILE_B),
statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING));
table.expireSnapshots()
.expireOlderThan(finalSnapshotTimestamp + 1)
.retainLast(1)
.commit();
Assert.assertFalse("Append manifest should be deleted on expiry", new File(newManifest.path()).exists());
}
}