blob: ec065285f995e452c83c77e170eb80d872f41d85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tephra.persist;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.ChangeId;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionType;
import org.apache.tephra.TxConstants;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.util.TransactionEditUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* Commons tests to run against the {@link TransactionStateStorage} implementations.
*/
public abstract class AbstractTransactionStateStorageTest {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionStateStorageTest.class);
private static Random random = new Random();
protected abstract Configuration getConfiguration(String testName) throws IOException;
protected abstract AbstractTransactionStateStorage getStorage(Configuration conf);
@Test
public void testSnapshotPersistence() throws Exception {
Configuration conf = getConfiguration("testSnapshotPersistence");
TransactionSnapshot snapshot = createRandomSnapshot();
TransactionStateStorage storage = getStorage(conf);
try {
storage.startAndWait();
storage.writeSnapshot(snapshot);
TransactionSnapshot readSnapshot = storage.getLatestSnapshot();
assertNotNull(readSnapshot);
assertEquals(snapshot, readSnapshot);
} finally {
storage.stopAndWait();
}
}
@Test
public void testLogWriteAndRead() throws Exception {
Configuration conf = getConfiguration("testLogWriteAndRead");
// create some random entries
List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(100);
TransactionStateStorage storage = getStorage(conf);
try {
long now = System.currentTimeMillis();
storage.startAndWait();
TransactionLog log = storage.createLog(now);
for (TransactionEdit edit : edits) {
log.append(edit);
}
log.close();
Collection<TransactionLog> logsToRead = storage.getLogsSince(now);
// should only be our one log
assertNotNull(logsToRead);
assertEquals(1, logsToRead.size());
TransactionLogReader logReader = logsToRead.iterator().next().getReader();
assertNotNull(logReader);
List<TransactionEdit> readEdits = Lists.newArrayListWithExpectedSize(edits.size());
TransactionEdit nextEdit;
while ((nextEdit = logReader.next()) != null) {
readEdits.add(nextEdit);
}
logReader.close();
assertEquals(edits.size(), readEdits.size());
for (int i = 0; i < edits.size(); i++) {
LOG.info("Checking edit " + i);
assertEquals(edits.get(i), readEdits.get(i));
}
} finally {
storage.stopAndWait();
}
}
@Test
public void testTransactionManagerPersistence() throws Exception {
Configuration conf = getConfiguration("testTransactionManagerPersistence");
conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
// start snapshot thread, but with long enough interval so we only get snapshots on shutdown
conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 600);
TransactionStateStorage storage = null;
TransactionStateStorage storage2 = null;
TransactionStateStorage storage3 = null;
try {
storage = getStorage(conf);
TransactionManager txManager = new TransactionManager
(conf, storage, new TxMetricsCollector());
txManager.startAndWait();
// TODO: replace with new persistence tests
final byte[] a = { 'a' };
final byte[] b = { 'b' };
// Start and invalidate a transaction
Transaction invalid = txManager.startShort();
txManager.invalidate(invalid.getTransactionId());
// start a tx1, add a change A and commit
Transaction tx1 = txManager.startShort();
Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
Assert.assertTrue(txManager.commit(tx1));
// start a tx2 and add a change B
Transaction tx2 = txManager.startShort();
Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
// start a tx3
Transaction tx3 = txManager.startShort();
// restart
txManager.stopAndWait();
TransactionSnapshot origState = txManager.getCurrentState();
LOG.info("Orig state: " + origState);
Thread.sleep(100);
// starts a new tx manager
storage2 = getStorage(conf);
txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
txManager.startAndWait();
// check that the reloaded state matches the old
TransactionSnapshot newState = txManager.getCurrentState();
LOG.info("New state: " + newState);
assertEquals(origState, newState);
// Verify that the invalid transaction list matches
Transaction checkTx = txManager.startShort();
Assert.assertEquals(origState.getInvalid(), Longs.asList(checkTx.getInvalids()));
txManager.abort(checkTx);
txManager.abort(invalid);
// commit tx2
Assert.assertTrue(txManager.commit(tx2));
// start another transaction, must be greater than tx3
Transaction tx4 = txManager.startShort();
Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId());
// tx1 must be visble from tx2, but tx3 and tx4 must not
Assert.assertTrue(tx2.isVisible(tx1.getTransactionId()));
Assert.assertFalse(tx2.isVisible(tx3.getTransactionId()));
Assert.assertFalse(tx2.isVisible(tx4.getTransactionId()));
// add same change for tx3
Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b)));
// check visibility with new xaction
Transaction tx5 = txManager.startShort();
Assert.assertTrue(tx5.isVisible(tx1.getTransactionId()));
Assert.assertTrue(tx5.isVisible(tx2.getTransactionId()));
Assert.assertFalse(tx5.isVisible(tx3.getTransactionId()));
Assert.assertFalse(tx5.isVisible(tx4.getTransactionId()));
// can commit tx3?
txManager.abort(tx3);
txManager.abort(tx4);
txManager.abort(tx5);
// start new tx and verify its exclude list is empty
Transaction tx6 = txManager.startShort();
Assert.assertFalse(tx6.hasExcludes());
txManager.abort(tx6);
// now start 5 x claim size transactions
Transaction tx = txManager.startShort();
for (int i = 1; i < 50; i++) {
tx = txManager.startShort();
}
origState = txManager.getCurrentState();
Thread.sleep(100);
// simulate crash by starting a new tx manager without a stopAndWait
storage3 = getStorage(conf);
txManager = new TransactionManager(conf, storage3, new TxMetricsCollector());
txManager.startAndWait();
// verify state again matches (this time should include WAL replay)
newState = txManager.getCurrentState();
assertEquals(origState, newState);
// get a new transaction and verify it is greater
Transaction txAfter = txManager.startShort();
Assert.assertTrue(txAfter.getTransactionId() > tx.getTransactionId());
} finally {
if (storage != null) {
storage.stopAndWait();
}
if (storage2 != null) {
storage2.stopAndWait();
}
if (storage3 != null) {
storage3.stopAndWait();
}
}
}
/**
* Tests whether the committed set is advanced properly on WAL replay.
*/
@Test
public void testCommittedSetClearing() throws Exception {
Configuration conf = getConfiguration("testCommittedSetClearing");
conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 0); // no periodic snapshots
TransactionStateStorage storage1 = null;
TransactionStateStorage storage2 = null;
try {
storage1 = getStorage(conf);
TransactionManager txManager = new TransactionManager
(conf, storage1, new TxMetricsCollector());
txManager.startAndWait();
// TODO: replace with new persistence tests
final byte[] a = { 'a' };
final byte[] b = { 'b' };
// start a tx1, add a change A and commit
Transaction tx1 = txManager.startShort();
Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
Assert.assertTrue(txManager.commit(tx1));
// start a tx2 and add a change B
Transaction tx2 = txManager.startShort();
Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
// start a tx3
Transaction tx3 = txManager.startShort();
TransactionSnapshot origState = txManager.getCurrentState();
LOG.info("Orig state: " + origState);
// simulate a failure by starting a new tx manager without stopping first
storage2 = getStorage(conf);
txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
txManager.startAndWait();
// check that the reloaded state matches the old
TransactionSnapshot newState = txManager.getCurrentState();
LOG.info("New state: " + newState);
assertEquals(origState, newState);
} finally {
if (storage1 != null) {
storage1.stopAndWait();
}
if (storage2 != null) {
storage2.stopAndWait();
}
}
}
/**
* Tests removal of old snapshots and old transaction logs.
*/
@Test
public void testOldFileRemoval() throws Exception {
Configuration conf = getConfiguration("testOldFileRemoval");
TransactionStateStorage storage = null;
try {
storage = getStorage(conf);
storage.startAndWait();
long now = System.currentTimeMillis();
long writePointer = 1;
Collection<Long> invalid = Lists.newArrayList();
NavigableMap<Long, TransactionManager.InProgressTx> inprogress = Maps.newTreeMap();
Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
Map<Long, Set<ChangeId>> committed = Maps.newHashMap();
TransactionSnapshot snapshot = new TransactionSnapshot(now, 0, writePointer++, invalid,
inprogress, committing, committed);
TransactionEdit dummyEdit = TransactionEdit.createStarted(1, 0, Long.MAX_VALUE, TransactionType.SHORT);
// write snapshot 1
storage.writeSnapshot(snapshot);
TransactionLog log = storage.createLog(now);
log.append(dummyEdit);
log.close();
snapshot = new TransactionSnapshot(now + 1, 0, writePointer++, invalid, inprogress, committing, committed);
// write snapshot 2
storage.writeSnapshot(snapshot);
log = storage.createLog(now + 1);
log.append(dummyEdit);
log.close();
snapshot = new TransactionSnapshot(now + 2, 0, writePointer++, invalid, inprogress, committing, committed);
// write snapshot 3
storage.writeSnapshot(snapshot);
log = storage.createLog(now + 2);
log.append(dummyEdit);
log.close();
snapshot = new TransactionSnapshot(now + 3, 0, writePointer++, invalid, inprogress, committing, committed);
// write snapshot 4
storage.writeSnapshot(snapshot);
log = storage.createLog(now + 3);
log.append(dummyEdit);
log.close();
snapshot = new TransactionSnapshot(now + 4, 0, writePointer++, invalid, inprogress, committing, committed);
// write snapshot 5
storage.writeSnapshot(snapshot);
log = storage.createLog(now + 4);
log.append(dummyEdit);
log.close();
snapshot = new TransactionSnapshot(now + 5, 0, writePointer++, invalid, inprogress, committing, committed);
// write snapshot 6
storage.writeSnapshot(snapshot);
log = storage.createLog(now + 5);
log.append(dummyEdit);
log.close();
List<String> allSnapshots = storage.listSnapshots();
LOG.info("All snapshots: " + allSnapshots);
assertEquals(6, allSnapshots.size());
List<String> allLogs = storage.listLogs();
LOG.info("All logs: " + allLogs);
assertEquals(6, allLogs.size());
long oldestKept = storage.deleteOldSnapshots(3);
assertEquals(now + 3, oldestKept);
allSnapshots = storage.listSnapshots();
LOG.info("All snapshots: " + allSnapshots);
assertEquals(3, allSnapshots.size());
storage.deleteLogsOlderThan(oldestKept);
allLogs = storage.listLogs();
LOG.info("All logs: " + allLogs);
assertEquals(3, allLogs.size());
} finally {
if (storage != null) {
storage.stopAndWait();
}
}
}
@Test
public void testLongTxnEditReplay() throws Exception {
Configuration conf = getConfiguration("testLongTxnEditReplay");
TransactionStateStorage storage = null;
try {
storage = getStorage(conf);
storage.startAndWait();
// Create long running txns. Abort one of them, invalidate another, invalidate and abort the last.
long time1 = System.currentTimeMillis();
long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
TransactionEdit edit2 = TransactionEdit.createAborted(wp1, TransactionType.LONG, null);
long time2 = time1 + 100;
long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 100000, TransactionType.LONG);
TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
long time3 = time1 + 200;
long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
TransactionEdit edit7 = TransactionEdit.createAborted(wp3, TransactionType.LONG, null);
// write transaction edits
TransactionLog log = storage.createLog(time1);
log.append(edit1);
log.append(edit2);
log.append(edit3);
log.append(edit4);
log.append(edit5);
log.append(edit6);
log.append(edit7);
log.close();
// Start transaction manager
TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
txm.startAndWait();
try {
// Verify that all txns are in invalid list.
TransactionSnapshot snapshot1 = txm.getCurrentState();
assertEquals(ImmutableList.of(wp1, wp2, wp3), snapshot1.getInvalid());
assertEquals(0, snapshot1.getInProgress().size());
assertEquals(0, snapshot1.getCommittedChangeSets().size());
assertEquals(0, snapshot1.getCommittedChangeSets().size());
} finally {
txm.stopAndWait();
}
} finally {
if (storage != null) {
storage.stopAndWait();
}
}
}
@Test
public void testTruncateInvalidTxEditReplay() throws Exception {
Configuration conf = getConfiguration("testTruncateInvalidTxEditReplay");
TransactionStateStorage storage = null;
try {
storage = getStorage(conf);
storage.startAndWait();
// Create some txns, and invalidate all of them.
long time1 = System.currentTimeMillis();
long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
TransactionEdit edit2 = TransactionEdit.createInvalid(wp1);
long time2 = time1 + 100;
long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 10000, TransactionType.SHORT);
TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
long time3 = time1 + 2000;
long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
long time4 = time1 + 2100;
long wp4 = time4 * TxConstants.MAX_TX_PER_MS;
TransactionEdit edit7 = TransactionEdit.createStarted(wp4, wp4 - 10, time4 + 10000, TransactionType.SHORT);
TransactionEdit edit8 = TransactionEdit.createInvalid(wp4);
// remove wp1 and wp3 from invalid list
TransactionEdit edit9 = TransactionEdit.createTruncateInvalidTx(ImmutableSet.of(wp1, wp3));
// truncate invalid transactions before time3
TransactionEdit edit10 = TransactionEdit.createTruncateInvalidTxBefore(time3);
// write transaction edits
TransactionLog log = storage.createLog(time1);
log.append(edit1);
log.append(edit2);
log.append(edit3);
log.append(edit4);
log.append(edit5);
log.append(edit6);
log.append(edit7);
log.append(edit8);
log.append(edit9);
log.append(edit10);
log.close();
// Start transaction manager
TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
txm.startAndWait();
try {
// Only wp4 should be in invalid list.
TransactionSnapshot snapshot = txm.getCurrentState();
assertEquals(ImmutableList.of(wp4), snapshot.getInvalid());
assertEquals(0, snapshot.getInProgress().size());
assertEquals(0, snapshot.getCommittedChangeSets().size());
assertEquals(0, snapshot.getCommittedChangeSets().size());
} finally {
txm.stopAndWait();
}
} finally {
if (storage != null) {
storage.stopAndWait();
}
}
}
/**
* Generates a new snapshot object with semi-randomly populated values. This does not necessarily accurately
* represent a typical snapshot's distribution of values, as we only set an upper bound on pointer values.
*
* We generate a new snapshot with the contents:
* <ul>
* <li>readPointer = 1M + (random % 1M)</li>
* <li>writePointer = readPointer + 1000</li>
* <li>waterMark = writePointer + 1000</li>
* <li>inProgress = one each for (writePointer - 500)..writePointer, ~ 5% "long" transaction</li>
* <li>invalid = 100 randomly distributed, 0..1M</li>
* <li>committing = one each, (readPointer + 1)..(readPointer + 100)</li>
* <li>committed = one each, (readPointer - 1000)..readPointer</li>
* </ul>
* @return a new snapshot of transaction state.
*/
private TransactionSnapshot createRandomSnapshot() {
// limit readPointer to a reasonable range, but make it > 1M so we can assign enough keys below
long readPointer = (Math.abs(random.nextLong()) % 1000000L) + 1000000L;
long writePointer = readPointer + 1000L;
// generate in progress -- assume last 500 write pointer values
NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
long startPointer = writePointer - 500L;
for (int i = 0; i < 500; i++) {
long currentTime = System.currentTimeMillis();
// make some "long" transactions
if (i % 20 == 0) {
inProgress.put(startPointer + i,
new TransactionManager.InProgressTx(startPointer - 1, currentTime + TimeUnit.DAYS.toSeconds(1),
TransactionManager.InProgressType.LONG));
} else {
inProgress.put(startPointer + i,
new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L,
TransactionManager.InProgressType.SHORT));
}
}
// make 100 random invalid IDs
LongArrayList invalid = new LongArrayList();
for (int i = 0; i < 100; i++) {
invalid.add(Math.abs(random.nextLong()) % 1000000L);
}
// make 100 committing entries, 10 keys each
Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
for (int i = 0; i < 100; i++) {
committing.put(readPointer + i, generateChangeSet(10));
}
// make 1000 committed entries, 10 keys each
long startCommitted = readPointer - 1000L;
NavigableMap<Long, Set<ChangeId>> committed = Maps.newTreeMap();
for (int i = 0; i < 1000; i++) {
committed.put(startCommitted + i, generateChangeSet(10));
}
return new TransactionSnapshot(System.currentTimeMillis(), readPointer, writePointer,
invalid, inProgress, committing, committed);
}
private Set<ChangeId> generateChangeSet(int numEntries) {
Set<ChangeId> changes = Sets.newHashSet();
for (int i = 0; i < numEntries; i++) {
byte[] bytes = new byte[8];
random.nextBytes(bytes);
changes.add(new ChangeId(bytes));
}
return changes;
}
}