TEPHRA-235 Ensure that TransactionSnapshot always have a sorted invalid transaction list.
This closes #44
Signed-off-by: poorna <poorna@apache.org>
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 27b6bc6..3f332ad 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -466,7 +466,7 @@
public synchronized TransactionSnapshot getCurrentState() {
return TransactionSnapshot.copyFrom(System.currentTimeMillis(), readPointer, lastWritePointer,
- invalidTxList.toRawList(), inProgress, committingChangeSets,
+ invalidTxList, inProgress, committingChangeSets,
committedChangeSets);
}
diff --git a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
index 231196c..5d032f6 100644
--- a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
+++ b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
@@ -110,7 +110,7 @@
}
/**
- * @return list of invalid transactions. The list is not sorted.
+ * @return list of invalid transactions. The list is not guaranteed to be sorted.
*/
public LongList toRawList() {
return LongLists.unmodifiable(invalid);
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
index ccf7374..d76a98f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
@@ -19,10 +19,11 @@
package org.apache.tephra.persist;
import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.tephra.ChangeId;
import org.apache.tephra.TransactionManager;
+import org.apache.tephra.manager.InvalidTxList;
import java.util.Collection;
import java.util.Collections;
@@ -44,6 +45,18 @@
private Map<Long, Set<ChangeId>> committingChangeSets;
private Map<Long, Set<ChangeId>> committedChangeSets;
+ /**
+ * Creates an instance of TransactionSnapshot with the given transaction state
+ *
+ * @param timestamp timestamp, in millis, that the snapshot was taken
+ * @param readPointer current transaction read pointer
+ * @param writePointer current transaction write pointer
+ * @param invalid current list of invalid write pointer; must be sorted
+ * @param inProgress current map of in-progress write pointers to expiration timestamps
+ * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not
+ * yet committed
+ * @param committed current map of write pointers to change sets which have committed
+ */
public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
Map<Long, Set<ChangeId>> committing, Map<Long, Set<ChangeId>> committed) {
@@ -52,6 +65,15 @@
this.committedChangeSets = committed;
}
+ /**
+ * Creates an instance of TransactionSnapshot with the given transaction state
+ *
+ * @param timestamp timestamp, in millis, that the snapshot was taken
+ * @param readPointer current transaction read pointer
+ * @param writePointer current transaction write pointer
+ * @param invalid current list of invalid write pointer; must be sorted
+ * @param inProgress current map of in-progress write pointers to expiration timestamps
+ */
public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
NavigableMap<Long, TransactionManager.InProgressTx> inProgress) {
this.timestamp = timestamp;
@@ -162,9 +184,10 @@
/**
* Creates a new {@code TransactionSnapshot} instance with copies of all of the individual collections.
+ * @param snapshotTime timestamp, in millis, that the snapshot was taken
* @param readPointer current transaction read pointer
* @param writePointer current transaction write pointer
- * @param invalid current list of invalid write pointers
+ * @param invalidTxList current list of invalid write pointers
* @param inProgress current map of in-progress write pointers to expiration timestamps
* @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not
* yet committed
@@ -172,12 +195,12 @@
* @return a new {@code TransactionSnapshot} instance
*/
public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer,
- long writePointer, Collection<Long> invalid,
+ long writePointer, InvalidTxList invalidTxList,
NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
Map<Long, Set<ChangeId>> committing,
NavigableMap<Long, Set<ChangeId>> committed) {
- // copy invalid IDs
- Collection<Long> invalidCopy = Lists.newArrayList(invalid);
+ // copy invalid IDs, after sorting
+ Collection<Long> invalidCopy = new LongArrayList(invalidTxList.toSortedArray());
// copy in-progress IDs and expirations
NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = Maps.newTreeMap(inProgress);
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
index f67c58b..18f81c8 100644
--- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -20,8 +20,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -46,6 +49,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -303,6 +307,14 @@
Transaction transaction = txManager.startLong();
Transaction checkpointTx = txManager.checkpoint(transaction);
+ // create invalid transactions (invalidated out of order)
+ Transaction shortTx1 = txManager.startShort();
+ Transaction shortTx2 = txManager.startShort();
+ Transaction shortTx3 = txManager.startShort();
+ txManager.invalidate(shortTx3.getTransactionId());
+ txManager.invalidate(shortTx1.getTransactionId());
+ txManager.invalidate(shortTx2.getTransactionId());
+
// shutdown to force a snapshot
txManager.stopAndWait();
@@ -311,6 +323,7 @@
txStorage.startAndWait();
TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
+ Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
@@ -339,6 +352,7 @@
// state should be recovered
snapshot = txManager.getCurrentState();
+ Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
inProgress = snapshot.getInProgress();
Assert.assertEquals(2, inProgress.size());
@@ -363,6 +377,7 @@
txStorage2.startAndWait();
snapshot = txStorage2.getLatestSnapshot();
+ Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
Assert.assertTrue(snapshot.getInProgress().isEmpty());
txStorage2.stopAndWait();
}
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index b25ae37..1879116 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -64,6 +64,7 @@
import org.apache.tephra.TxConstants;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.persist.HDFSTransactionStateStorage;
import org.apache.tephra.persist.TransactionSnapshot;
@@ -141,9 +142,11 @@
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
// write an initial transaction snapshot
+ InvalidTxList invalidTxList = new InvalidTxList();
+ invalidTxList.addAll(invalidSet);
TransactionSnapshot txSnapshot =
TransactionSnapshot.copyFrom(
- System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+ System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
// this will set visibility upper bound to V[6]
Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index e612e2a..abe375d 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -70,6 +70,7 @@
import org.apache.tephra.TxConstants;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.persist.HDFSTransactionStateStorage;
import org.apache.tephra.persist.TransactionSnapshot;
@@ -147,8 +148,10 @@
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
// write an initial transaction snapshot
+ InvalidTxList invalidTxList = new InvalidTxList();
+ invalidTxList.addAll(invalidSet);
TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
- System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+ System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
// this will set visibility upper bound to V[6]
Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index b92bb09..f6d8e2d 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -53,6 +53,7 @@
import org.apache.tephra.TxConstants;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.persist.HDFSTransactionStateStorage;
import org.apache.tephra.persist.TransactionSnapshot;
@@ -127,8 +128,10 @@
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
// write an initial transaction snapshot
+ InvalidTxList invalidTxList = new InvalidTxList();
+ invalidTxList.addAll(invalidSet);
TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
- System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+ System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
// this will set visibility upper bound to V[6]
Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 4b236fc..8dfce32 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -53,6 +53,7 @@
import org.apache.tephra.TxConstants;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.persist.HDFSTransactionStateStorage;
import org.apache.tephra.persist.TransactionSnapshot;
@@ -127,8 +128,10 @@
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
// write an initial transaction snapshot
+ InvalidTxList invalidTxList = new InvalidTxList();
+ invalidTxList.addAll(invalidSet);
TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
- System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+ System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
// this will set visibility upper bound to V[6]
Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index d21c987..9f7206d 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -53,6 +53,7 @@
import org.apache.tephra.TxConstants;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.persist.HDFSTransactionStateStorage;
import org.apache.tephra.persist.TransactionSnapshot;
@@ -127,8 +128,10 @@
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
// write an initial transaction snapshot
+ InvalidTxList invalidTxList = new InvalidTxList();
+ invalidTxList.addAll(invalidSet);
TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
- System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+ System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
// this will set visibility upper bound to V[6]
Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),