blob: 08b5cf500362df571c614d2500e26d809e83145b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.iceberg;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
public class TestTransaction extends TableTestBase {
@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] { 1, 2 };
public TestTransaction(int formatVersion) {
public void testEmptyTransaction() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0", 0, (int) version());
public void testSingleOperationTransaction() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
Assert.assertSame("Base metadata should not change when an append is committed",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after append", 0, (int) version());
validateSnapshot(base.currentSnapshot(), readMetadata().currentSnapshot(), FILE_A, FILE_B);
Assert.assertEquals("Table should be on version 1 after commit", 1, (int) version());
public void testMultipleOperationTransaction() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
Snapshot appendSnapshot = txn.table().currentSnapshot();
Snapshot deleteSnapshot = txn.table().currentSnapshot();
Assert.assertSame("Base metadata should not change when an append is committed",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after append", 0, (int) version());
Assert.assertEquals("Table should be on version 1 after commit", 1, (int) version());
Assert.assertEquals("Table should have one manifest after commit",
1, readMetadata().currentSnapshot().allManifests().size());
Assert.assertEquals("Table snapshot should be the delete snapshot",
deleteSnapshot, readMetadata().currentSnapshot());
ids(deleteSnapshot.snapshotId(), appendSnapshot.snapshotId()),
files(FILE_A, FILE_B), statuses(Status.DELETED, Status.EXISTING));
Assert.assertEquals("Table should have a snapshot for each operation",
2, readMetadata().snapshots().size());
ids(appendSnapshot.snapshotId(), appendSnapshot.snapshotId()),
files(FILE_A, FILE_B), statuses(Status.ADDED, Status.ADDED));
public void testMultipleOperationTransactionFromTable() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
Snapshot appendSnapshot = txn.table().currentSnapshot();
Snapshot deleteSnapshot = txn.table().currentSnapshot();
Assert.assertSame("Base metadata should not change when an append is committed",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after append", 0, (int) version());
Assert.assertEquals("Table should be on version 1 after commit", 1, (int) version());
Assert.assertEquals("Table should have one manifest after commit",
1, readMetadata().currentSnapshot().allManifests().size());
Assert.assertEquals("Table snapshot should be the delete snapshot",
deleteSnapshot, readMetadata().currentSnapshot());
ids(deleteSnapshot.snapshotId(), appendSnapshot.snapshotId()),
files(FILE_A, FILE_B), statuses(Status.DELETED, Status.EXISTING));
Assert.assertEquals("Table should have a snapshot for each operation",
2, readMetadata().snapshots().size());
ids(appendSnapshot.snapshotId(), appendSnapshot.snapshotId()),
files(FILE_A, FILE_B), statuses(Status.ADDED, Status.ADDED));
public void testDetectsUncommittedChange() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
txn.newAppend().appendFile(FILE_A).appendFile(FILE_B); // not committed
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
AssertHelpers.assertThrows("Should reject commit when last operation has not committed",
"Cannot create new DeleteFiles: last operation has not committed",
public void testDetectsUncommittedChangeOnCommit() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
txn.newAppend().appendFile(FILE_A).appendFile(FILE_B); // not committed
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
AssertHelpers.assertThrows("Should reject commit when last operation has not committed",
"Cannot commit transaction: last operation has not committed",
public void testTransactionConflict() {
// set retries to 0 to catch the failure
.set(TableProperties.COMMIT_NUM_RETRIES, "0")
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
AssertHelpers.assertThrows("Transaction commit should fail",
CommitFailedException.class, "Injected failure", txn::commitTransaction);
public void testTransactionRetry() {
// use only one retry
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
Set<ManifestFile> appendManifests = Sets.newHashSet(txn.table().currentSnapshot().allManifests());
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
Assert.assertEquals("Table should be on version 2 after commit", 2, (int) version());
Assert.assertEquals("Should reuse manifests from initial append commit",
appendManifests, Sets.newHashSet(table.currentSnapshot().allManifests()));
public void testTransactionRetryMergeAppend() {
// use only one retry
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
Set<ManifestFile> appendManifests = Sets.newHashSet(txn.table().currentSnapshot().allManifests());
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().allManifests());
Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
Set<ManifestFile> expectedManifests = Sets.newHashSet();
Assert.assertEquals("Should reuse manifests from initial append commit and conflicting append",
expectedManifests, Sets.newHashSet(table.currentSnapshot().allManifests()));
public void testMultipleUpdateTransactionRetryMergeCleanup() {
// use only one retry and aggressively merge manifests
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
.set("test-property", "test-value")
Assert.assertEquals("Append should create one manifest",
1, txn.table().currentSnapshot().allManifests().size());
ManifestFile appendManifest = txn.table().currentSnapshot().allManifests().get(0);
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().allManifests());
Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
Set<ManifestFile> previousManifests = Sets.newHashSet();
Assert.assertEquals("Should merge both commit manifests into a single manifest",
1, table.currentSnapshot().allManifests().size());
Assert.assertFalse("Should merge both commit manifests into a new manifest",
Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
public void testTransactionRetryMergeCleanup() {
// use only one retry and aggressively merge manifests
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
Assert.assertEquals("Table should be on version 1", 1, (int) version());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
Assert.assertEquals("Append should create one manifest",
1, txn.table().currentSnapshot().allManifests().size());
ManifestFile appendManifest = txn.table().currentSnapshot().allManifests().get(0);
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
// cause the transaction commit to fail
Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().allManifests());
Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
Set<ManifestFile> previousManifests = Sets.newHashSet();
Assert.assertEquals("Should merge both commit manifests into a single manifest",
1, table.currentSnapshot().allManifests().size());
Assert.assertFalse("Should merge both commit manifests into a new manifest",
Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
public void testTransactionRetryAndAppendManifests() throws Exception {
// use only one retry and aggressively merge manifests
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
Assert.assertEquals("Table should be on version 1", 1, (int) version());
Assert.assertEquals("Table should be on version 2 after append", 2, (int) version());
Assert.assertEquals("Append should create one manifest", 1, table.currentSnapshot().allManifests().size());
ManifestFile v1manifest = table.currentSnapshot().allManifests().get(0);
TableMetadata base = readMetadata();
// create a manifest append
OutputFile manifestLocation = Files.localOutput("/tmp/" + UUID.randomUUID().toString() + ".avro");
ManifestWriter<DataFile> writer = ManifestFiles.write(table.spec(), manifestLocation);
try {
} finally {
Transaction txn = table.newTransaction();
Assert.assertSame("Base metadata should not change when commit is created", base, readMetadata());
Assert.assertEquals("Table should be on version 2 after txn create", 2, (int) version());
Assert.assertEquals("Append should have one merged manifest",
1, txn.table().currentSnapshot().allManifests().size());
ManifestFile mergedManifest = txn.table().currentSnapshot().allManifests().get(0);
// find the initial copy of the appended manifest
String copiedAppendManifest = Iterables.getOnlyElement(Iterables.filter(
Iterables.transform(listManifestFiles(), File::getPath),
path -> !v1manifest.path().contains(path) && !mergedManifest.path().contains(path)));
Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
Assert.assertTrue("Copied append manifest should not be deleted yet", new File(copiedAppendManifest).exists());
// cause the transaction commit to fail and retry
Assert.assertEquals("Table should be on version 3 after real append", 3, (int) version());
Assert.assertEquals("Table should be on version 4 after commit", 4, (int) version());
Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
Assert.assertFalse("Append manifest should be deleted", new File(copiedAppendManifest).exists());
Assert.assertTrue("Transaction should hijack the delete of the first merged manifest",
((BaseTransaction) txn).deletedFiles().contains(mergedManifest.path()));
Assert.assertFalse("Append manifest should be deleted", new File(mergedManifest.path()).exists());
Assert.assertEquals("Should merge all commit manifests into a single manifest",
1, table.currentSnapshot().allManifests().size());
public void testTransactionRetryAndAppendManifestsWithSnapshotIdInheritance() throws Exception {
// use only one retry and aggressively merge manifests
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true")
Assert.assertEquals("Table should be on version 1", 1, (int) version());
Assert.assertEquals("Table should be on version 2 after append", 2, (int) version());
Assert.assertEquals("Append should create one manifest", 1, table.currentSnapshot().allManifests().size());
TableMetadata base = readMetadata();
Transaction txn = table.newTransaction();
ManifestFile appendManifest = writeManifestWithName("input.m0", FILE_D);
Assert.assertSame("Base metadata should not change when commit is created", base, readMetadata());
Assert.assertEquals("Table should be on version 2 after txn create", 2, (int) version());
Assert.assertEquals("Append should have one merged manifest",
1, txn.table().currentSnapshot().allManifests().size());
ManifestFile mergedManifest = txn.table().currentSnapshot().allManifests().get(0);
// cause the transaction commit to fail and retry
Assert.assertEquals("Table should be on version 3 after real append", 3, (int) version());
Assert.assertEquals("Table should be on version 4 after commit", 4, (int) version());
Assert.assertTrue("Transaction should hijack the delete of the original append manifest",
((BaseTransaction) txn).deletedFiles().contains(appendManifest.path()));
Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
Assert.assertTrue("Transaction should hijack the delete of the first merged manifest",
((BaseTransaction) txn).deletedFiles().contains(mergedManifest.path()));
Assert.assertFalse("Merged append manifest should be deleted", new File(mergedManifest.path()).exists());
Assert.assertEquals("Should merge all commit manifests into a single manifest",
1, table.currentSnapshot().allManifests().size());
public void testTransactionNoCustomDeleteFunc() {
AssertHelpers.assertThrows("Should fail setting a custom delete function with a transaction",
IllegalArgumentException.class, "Cannot set delete callback more than once",
() -> table.newTransaction()
.deleteWith(file -> { }));
public void testTransactionFastAppends() {
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
Transaction txn = table.newTransaction();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Expected 2 manifests", 2, manifests.size());
public void testTransactionRewriteManifestsAppendedDirectly() throws IOException {
Table table = load();
.set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
long firstSnapshotId = table.currentSnapshot().snapshotId();
long secondSnapshotId = table.currentSnapshot().snapshotId();
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests after 2 appends", 2, manifests.size());
ManifestFile newManifest = writeManifest(
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshotId, FILE_A),
manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshotId, FILE_B));
Transaction txn = table.newTransaction();
long finalSnapshotId = table.currentSnapshot().snapshotId();
long finalSnapshotTimestamp = System.currentTimeMillis();
Assert.assertTrue("Append manifest should not be deleted", new File(newManifest.path()).exists());
List<ManifestFile> finalManifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 1 final manifest", 1, finalManifests.size());
ids(finalSnapshotId, firstSnapshotId, secondSnapshotId),
files(FILE_C, FILE_A, FILE_B),
statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING));
.expireOlderThan(finalSnapshotTimestamp + 1)
Assert.assertFalse("Append manifest should be deleted on expiry", new File(newManifest.path()).exists());