blob: b09c6c0bba6cb36a1abca867b55a731e36464726 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tephra;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.persist.TransactionSnapshot;
import org.apache.tephra.persist.TransactionStateStorage;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* Base class for testing implementations of {@link TransactionSystemClient}.
*/
public abstract class TransactionSystemTest {
private static final byte[] C1 = new byte[] { 'c', '1' };
private static final byte[] C2 = new byte[] { 'c', '2' };
private static final byte[] C3 = new byte[] { 'c', '3' };
private static final byte[] C4 = new byte[] { 'c', '4' };
/**
* Sets up the common properties required for the test cases defined here.
* Subclasses can call this and add more properties as needed.
*
* @param existing An existing configuration to be modified. If null, a new confoguration is created.
*/
@SuppressWarnings("WeakerAccess")
protected static Configuration getCommonConfiguration(@Nullable Configuration existing) {
Configuration conf = existing != null ? existing : new Configuration();
conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit
conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT, 50);
conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD, 40);
conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT, 2048);
conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD, 1024);
return conf;
}
protected abstract TransactionSystemClient getClient() throws Exception;
protected abstract TransactionStateStorage getStateStorage() throws Exception;
@Test // can't do (expected=IllegalArgumentException) because the subclass needs to perform an extra assert
public void testNegativeTimeout() throws Exception {
try {
getClient().startShort(-1);
Assert.fail("Expected illegal argument for negative timeout");
} catch (IllegalArgumentException e) {
// expected
}
}
@Test // can't do (expected=IllegalArgumentException) because the subclass needs to perform an extra assert
public void testExcessiveTimeout() throws Exception {
try {
getClient().startShort((int) TimeUnit.DAYS.toSeconds(10));
Assert.fail("Expected illegal argument for excessive timeout");
} catch (IllegalArgumentException e) {
// expected
}
}
@Test
public void testLargeChangeSet() throws Exception {
TransactionSystemClient client = getClient();
// first try with 50 changes (the max allowed)
List<byte[]> fiftyChanges = new ArrayList<>(51);
for (byte i = 0; i < 50; i++) {
fiftyChanges.add(new byte[] { i });
}
Transaction tx = client.startShort();
client.canCommitOrThrow(tx, fiftyChanges);
client.commitOrThrow(tx);
// now try another transaction with 51 changes
fiftyChanges.add(new byte[] { 50 });
tx = client.startShort();
try {
client.canCommitOrThrow(tx, fiftyChanges);
Assert.fail("Expected " + TransactionSizeException.class.getName());
} catch (TransactionSizeException e) {
// expected
}
client.abort(tx);
// now try a change set that is just within the size limit
List<byte[]> changes2k = new ArrayList<>(51);
for (byte i = 0; i < 8; i++) {
byte[] change = new byte[256];
change[0] = i;
changes2k.add(change);
}
tx = client.startShort();
client.canCommitOrThrow(tx, changes2k);
client.commitOrThrow(tx);
// now add another byte to the change set to exceed the limit
changes2k.add(new byte[] { 0 });
tx = client.startShort();
try {
client.canCommitOrThrow(tx, changes2k);
Assert.fail("Expected " + TransactionSizeException.class.getName());
} catch (TransactionSizeException e) {
// expected
}
client.abort(tx);
}
@Test
public void testCommitRaceHandling() throws Exception {
TransactionSystemClient client1 = getClient();
TransactionSystemClient client2 = getClient();
Transaction tx1 = client1.startShort();
Transaction tx2 = client2.startShort();
client1.canCommitOrThrow(tx1, asList(C1, C2));
// second one also can commit even thought there are conflicts with first since first one hasn't committed yet
client2.canCommitOrThrow(tx2, asList(C2, C3));
client1.commitOrThrow(tx1);
// now second one should not commit, since there are conflicts with tx1 that has been committed
assertCommitConflicts(client2, tx2);
}
@Test
public void testMultipleCommitsAtSameTime() throws Exception {
// We want to check that if two txs finish at same time (wrt tx manager) they do not overwrite changesets of each
// other in tx manager used for conflicts detection (we had this bug)
// NOTE: you don't have to use multiple clients for that
TransactionSystemClient client1 = getClient();
TransactionSystemClient client2 = getClient();
TransactionSystemClient client3 = getClient();
TransactionSystemClient client4 = getClient();
TransactionSystemClient client5 = getClient();
Transaction tx1 = client1.startShort();
Transaction tx2 = client2.startShort();
Transaction tx3 = client3.startShort();
Transaction tx4 = client4.startShort();
Transaction tx5 = client5.startShort();
client1.canCommitOrThrow(tx1, asList(C1));
client1.commitOrThrow(tx1);
client2.canCommitOrThrow(tx2, asList(C2));
client2.commitOrThrow(tx2);
// verifying conflicts detection
assertCanCommitConflicts(client3, tx3, asList(C1));
assertCanCommitConflicts(client4, tx4, asList(C2));
client5.canCommitOrThrow(tx5, asList(C3));
}
@Test
public void testCommitTwice() throws Exception {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
client.canCommitOrThrow(tx, asList(C1, C2));
client.commitOrThrow(tx);
// cannot commit twice same tx
assertCommitNotInProgress(client, tx);
}
@Test
public void testAbortTwice() throws Exception {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
client.canCommitOrThrow(tx, asList(C1, C2));
client.abort(tx);
// abort of not active tx has no affect
client.abort(tx);
}
@Test
public void testReuseTx() throws Exception {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
client.canCommitOrThrow(tx, asList(C1, C2));
client.commitOrThrow(tx);
// can't re-use same tx again
assertCanCommitNotInProgress(client, tx, asList(C3, C4));
assertCommitNotInProgress(client, tx);
// abort of not active tx has no affect
client.abort(tx);
}
@Test
public void testUseNotStarted() throws Exception {
TransactionSystemClient client = getClient();
Transaction tx1 = client.startShort();
client.commitOrThrow(tx1);
// we know this is one is older than current writePointer and was not used
Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1,
new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
TransactionType.SHORT);
assertCanCommitNotInProgress(client, txOld, asList(C3, C4));
assertCommitNotInProgress(client, txOld);
// abort of not active tx has no affect
client.abort(txOld);
// we know this is one is newer than current readPointer and was not used
Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1,
new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
TransactionType.SHORT);
assertCanCommitNotInProgress(client, txNew, asList(C3, C4));
assertCommitNotInProgress(client, txNew);
// abort of not active tx has no affect
client.abort(txNew);
}
@Test
public void testAbortAfterCommit() throws Exception {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
client.canCommitOrThrow(tx, asList(C1, C2));
client.commitOrThrow(tx);
// abort of not active tx has no affect
client.abort(tx);
}
// todo add test invalidate method
@Test
public void testInvalidateTx() throws Exception {
TransactionSystemClient client = getClient();
// Invalidate an in-progress tx
Transaction tx1 = client.startShort();
client.canCommitOrThrow(tx1, asList(C1, C2));
Assert.assertTrue(client.invalidate(tx1.getTransactionId()));
// Cannot invalidate a committed tx
Transaction tx2 = client.startShort();
client.canCommitOrThrow(tx2, asList(C3, C4));
client.commitOrThrow(tx2);
Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
}
@Test
public void testResetState() throws Exception {
// have tx in progress, committing and committed then reset,
// get the last snapshot and see that it is empty
TransactionSystemClient client = getClient();
TransactionStateStorage stateStorage = getStateStorage();
Transaction tx1 = client.startShort();
Transaction tx2 = client.startShort();
client.canCommitOrThrow(tx1, asList(C1, C2));
client.commitOrThrow(tx1);
client.canCommitOrThrow(tx2, asList(C3, C4));
Transaction txPreReset = client.startShort();
long currentTs = System.currentTimeMillis();
client.resetState();
TransactionSnapshot snapshot = stateStorage.getLatestSnapshot();
Assert.assertTrue(snapshot.getTimestamp() >= currentTs);
Assert.assertEquals(0, snapshot.getInvalid().size());
Assert.assertEquals(0, snapshot.getInProgress().size());
Assert.assertEquals(0, snapshot.getCommittingChangeSets().size());
Assert.assertEquals(0, snapshot.getCommittedChangeSets().size());
// confirm that transaction IDs are not reset
Transaction txPostReset = client.startShort();
Assert.assertTrue("New tx ID should be greater than last ID before reset",
txPostReset.getTransactionId() > txPreReset.getTransactionId());
}
@Test
public void testTruncateInvalidTx() throws Exception {
// Start few transactions and invalidate all of them
TransactionSystemClient client = getClient();
Transaction tx1 = client.startLong();
Transaction tx2 = client.startShort();
Transaction tx3 = client.startLong();
client.invalidate(tx1.getTransactionId());
client.invalidate(tx2.getTransactionId());
client.invalidate(tx3.getTransactionId());
// Remove tx2 and tx3 from invalid list
Assert.assertTrue(client.truncateInvalidTx(ImmutableSet.of(tx2.getTransactionId(), tx3.getTransactionId())));
Transaction tx = client.startShort();
// Only tx1 should be in invalid list now
Assert.assertArrayEquals(new long[] {tx1.getTransactionId()}, tx.getInvalids());
client.abort(tx);
}
@Test
public void testTruncateInvalidTxBefore() throws Exception {
// Start few transactions
TransactionSystemClient client = getClient();
Transaction tx1 = client.startLong();
Transaction tx2 = client.startShort();
// Sleep so that transaction ids get generated a millisecond apart for assertion
// TEPHRA-63 should eliminate the need to sleep
TimeUnit.MILLISECONDS.sleep(1);
long beforeTx3 = System.currentTimeMillis();
Transaction tx3 = client.startLong();
try {
// throws exception since tx1 and tx2 are still in-progress
client.truncateInvalidTxBefore(beforeTx3);
Assert.fail("Expected InvalidTruncateTimeException exception");
} catch (InvalidTruncateTimeException e) {
// Expected exception
}
// Invalidate all of them
client.invalidate(tx1.getTransactionId());
client.invalidate(tx2.getTransactionId());
client.invalidate(tx3.getTransactionId());
// Remove transactions before time beforeTx3
Assert.assertTrue(client.truncateInvalidTxBefore(beforeTx3));
Transaction tx = client.startShort();
// Only tx3 should be in invalid list now
Assert.assertArrayEquals(new long[] {tx3.getTransactionId()}, tx.getInvalids());
client.abort(tx);
}
@Test
public void testGetInvalidSize() throws Exception {
// Start few transactions and invalidate all of them
TransactionSystemClient client = getClient();
Transaction tx1 = client.startLong();
Transaction tx2 = client.startShort();
Transaction tx3 = client.startLong();
Assert.assertEquals(0, client.getInvalidSize());
client.invalidate(tx1.getTransactionId());
client.invalidate(tx2.getTransactionId());
client.invalidate(tx3.getTransactionId());
Assert.assertEquals(3, client.getInvalidSize());
}
@Test
public void testCheckpointing() throws Exception {
TransactionSystemClient client = getClient();
// create a few transactions
Transaction tx1 = client.startShort();
Transaction tx2 = client.startShort();
Transaction tx3 = client.startShort();
// start and commit a few
for (int i = 0; i < 5; i++) {
Transaction tx = client.startShort();
client.canCommitOrThrow(tx, Collections.singleton(new byte[] { (byte) i }));
client.commitOrThrow(tx);
}
// checkpoint the transactions
Transaction tx3c = client.checkpoint(tx3);
Transaction tx2c = client.checkpoint(tx2);
Transaction tx1c = client.checkpoint(tx1);
// start and commit a few (this moves the read pointer past all checkpoint write versions)
for (int i = 5; i < 10; i++) {
Transaction tx = client.startShort();
client.canCommitOrThrow(tx, Collections.singleton(new byte[] { (byte) i }));
client.commitOrThrow(tx);
}
// start new tx and validate all write pointers are excluded
Transaction tx = client.startShort();
validateSorted(tx.getInProgress());
validateSorted(tx.getInvalids());
Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
client.abort(tx);
// abort one of the checkpoints
client.abort(tx1c);
// start new tx and validate all write pointers are excluded
tx = client.startShort();
validateSorted(tx.getInProgress());
validateSorted(tx.getInvalids());
Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
client.abort(tx);
// invalidate one of the checkpoints
client.invalidate(tx2c.getTransactionId());
// start new tx and validate all write pointers are excluded
tx = client.startShort();
validateSorted(tx.getInProgress());
validateSorted(tx.getInvalids());
Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
client.abort(tx);
// commit the last checkpoint
client.canCommitOrThrow(tx3, Collections.<byte[]>emptyList());
client.commitOrThrow(tx3c);
// start new tx and validate all write pointers are excluded
tx = client.startShort();
validateSorted(tx.getInProgress());
validateSorted(tx.getInvalids());
Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
client.abort(tx);
}
private void validateSorted(long[] array) {
Long lastSeen = null;
for (long value : array) {
Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
lastSeen == null || lastSeen < value);
lastSeen = value;
}
}
private void assertCommitConflicts(TransactionSystemClient client, Transaction tx)
throws TransactionFailureException {
try {
client.commitOrThrow(tx);
Assert.fail();
} catch (TransactionConflictException e) {
//expected
}
}
private void assertCanCommitConflicts(TransactionSystemClient client, Transaction tx, Collection<byte[]> changes)
throws TransactionFailureException {
try {
client.canCommitOrThrow(tx, changes);
Assert.fail();
} catch (TransactionConflictException e) {
//expected
}
}
private void assertCommitNotInProgress(TransactionSystemClient client, Transaction tx)
throws TransactionFailureException {
try {
client.commitOrThrow(tx);
Assert.fail();
} catch (TransactionNotInProgressException e) {
//expected
}
}
private void assertCanCommitNotInProgress(TransactionSystemClient client, Transaction tx, Collection<byte[]> changes)
throws TransactionFailureException {
try {
client.canCommitOrThrow(tx, changes);
Assert.fail();
} catch (TransactionNotInProgressException e) {
//expected
}
}
private Collection<byte[]> asList(byte[]... val) {
return Arrays.asList(val);
}
}