TEPHRA-235 Ensure that TransactionSnapshot always have a sorted invalid transaction list.

This closes #44

Signed-off-by: poorna <poorna@apache.org>
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 27b6bc6..3f332ad 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -466,7 +466,7 @@
 
   public synchronized TransactionSnapshot getCurrentState() {
     return TransactionSnapshot.copyFrom(System.currentTimeMillis(), readPointer, lastWritePointer,
-                                        invalidTxList.toRawList(), inProgress, committingChangeSets,
+                                        invalidTxList, inProgress, committingChangeSets,
                                         committedChangeSets);
   }
 
diff --git a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
index 231196c..5d032f6 100644
--- a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
+++ b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
@@ -110,7 +110,7 @@
   }
 
   /**
-   * @return list of invalid transactions. The list is not sorted.
+   * @return list of invalid transactions. The list is not guaranteed to be sorted.
    */
   public LongList toRawList() {
     return LongLists.unmodifiable(invalid);
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
index ccf7374..d76a98f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
@@ -19,10 +19,11 @@
 package org.apache.tephra.persist;
 
 import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.TransactionManager;
+import org.apache.tephra.manager.InvalidTxList;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -44,6 +45,18 @@
   private Map<Long, Set<ChangeId>> committingChangeSets;
   private Map<Long, Set<ChangeId>> committedChangeSets;
 
+  /**
+   * Creates an instance of TransactionSnapshot with the given transaction state
+   *
+   * @param timestamp timestamp, in millis, that the snapshot was taken
+   * @param readPointer current transaction read pointer
+   * @param writePointer current transaction write pointer
+   * @param invalid current list of invalid write pointer; must be sorted
+   * @param inProgress current map of in-progress write pointers to expiration timestamps
+   * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not
+   *                   yet committed
+   * @param committed current map of write pointers to change sets which have committed
+   */
   public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
                              NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
                              Map<Long, Set<ChangeId>> committing, Map<Long, Set<ChangeId>> committed) {
@@ -52,6 +65,15 @@
     this.committedChangeSets = committed;
   }
 
+  /**
+   * Creates an instance of TransactionSnapshot with the given transaction state
+   *
+   * @param timestamp timestamp, in millis, that the snapshot was taken
+   * @param readPointer current transaction read pointer
+   * @param writePointer current transaction write pointer
+   * @param invalid current list of invalid write pointer; must be sorted
+   * @param inProgress current map of in-progress write pointers to expiration timestamps
+   */
   public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
                              NavigableMap<Long, TransactionManager.InProgressTx> inProgress) {
     this.timestamp = timestamp;
@@ -162,9 +184,10 @@
 
   /**
    * Creates a new {@code TransactionSnapshot} instance with copies of all of the individual collections.
+   * @param snapshotTime timestamp, in millis, that the snapshot was taken
    * @param readPointer current transaction read pointer
    * @param writePointer current transaction write pointer
-   * @param invalid current list of invalid write pointers
+   * @param invalidTxList current list of invalid write pointers
    * @param inProgress current map of in-progress write pointers to expiration timestamps
    * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not
    *                   yet committed
@@ -172,12 +195,12 @@
    * @return a new {@code TransactionSnapshot} instance
    */
   public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer,
-                                             long writePointer, Collection<Long> invalid,
+                                             long writePointer, InvalidTxList invalidTxList,
                                              NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
                                              Map<Long, Set<ChangeId>> committing,
                                              NavigableMap<Long, Set<ChangeId>> committed) {
-    // copy invalid IDs
-    Collection<Long> invalidCopy = Lists.newArrayList(invalid);
+    // copy invalid IDs, after sorting
+    Collection<Long> invalidCopy = new LongArrayList(invalidTxList.toSortedArray());
     // copy in-progress IDs and expirations
     NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = Maps.newTreeMap(inProgress);
 
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
index f67c58b..18f81c8 100644
--- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -20,8 +20,11 @@
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -46,6 +49,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -303,6 +307,14 @@
     Transaction transaction = txManager.startLong();
     Transaction checkpointTx = txManager.checkpoint(transaction);
 
+    // create invalid transactions (invalidated out of order)
+    Transaction shortTx1 = txManager.startShort();
+    Transaction shortTx2 = txManager.startShort();
+    Transaction shortTx3 = txManager.startShort();
+    txManager.invalidate(shortTx3.getTransactionId());
+    txManager.invalidate(shortTx1.getTransactionId());
+    txManager.invalidate(shortTx2.getTransactionId());
+
     // shutdown to force a snapshot
     txManager.stopAndWait();
 
@@ -311,6 +323,7 @@
     txStorage.startAndWait();
 
     TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
+    Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
     TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
     assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
 
@@ -339,6 +352,7 @@
 
     // state should be recovered
     snapshot = txManager.getCurrentState();
+    Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
     inProgress = snapshot.getInProgress();
     Assert.assertEquals(2, inProgress.size());
 
@@ -363,6 +377,7 @@
     txStorage2.startAndWait();
 
     snapshot = txStorage2.getLatestSnapshot();
+    Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
     Assert.assertTrue(snapshot.getInProgress().isEmpty());
     txStorage2.stopAndWait();
   }
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index b25ae37..1879116 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -64,6 +64,7 @@
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -141,9 +142,11 @@
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot =
       TransactionSnapshot.copyFrom(
-        System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+        System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
         // this will set visibility upper bound to V[6]
         Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
           V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index e612e2a..abe375d 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -70,6 +70,7 @@
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -147,8 +148,10 @@
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
-      System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+      System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
       // this will set visibility upper bound to V[6]
       Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
         V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index b92bb09..f6d8e2d 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -53,6 +53,7 @@
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -127,8 +128,10 @@
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
-      System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+      System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
       // this will set visibility upper bound to V[6]
       Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
         V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 4b236fc..8dfce32 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -53,6 +53,7 @@
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -127,8 +128,10 @@
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
-      System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+      System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
       // this will set visibility upper bound to V[6]
       Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
         V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index d21c987..9f7206d 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -53,6 +53,7 @@
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -127,8 +128,10 @@
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
-        System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+        System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
         // this will set visibility upper bound to V[6]
         Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
           V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),