(TEPHRA-253) Fix flaky TransactionProcessorTest

This closes #54 from GitHub.

Signed-off-by: anew <anew@apache.org>
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 1879116..3c7d1e2 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -74,6 +74,7 @@
 import org.apache.tephra.util.TxUtils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -175,7 +176,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -190,7 +191,7 @@
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
       LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
+      region.flushcache(); // in 0.96, there is no indication of success
 
       // now a normal scan should only return the valid rows - testing that cleanup works on flush
       Scan scan = new Scan();
@@ -231,7 +232,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -620,6 +621,21 @@
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class MockRegionServerServices implements RegionServerServices {
     private final Configuration hConf;
     private final ZooKeeperWatcher zookeeper;
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index abe375d..b8e051b 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -80,6 +80,7 @@
 import org.apache.tephra.util.TxUtils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -180,7 +181,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -195,7 +196,8 @@
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
       LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
+      HRegion.FlushResult flushResult = region.flushcache();
+      Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
 
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
@@ -237,7 +239,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -624,6 +626,21 @@
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class MockRegionServerServices implements RegionServerServices {
     private final Configuration hConf;
     private final ZooKeeperWatcher zookeeper;
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index f6d8e2d..9ce30b5 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -62,6 +62,7 @@
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -160,7 +161,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -175,7 +176,8 @@
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
       LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
+      HRegion.FlushResult flushResult = region.flushcache();
+      Assert.assertTrue("Unexpected flush result: " + flushResult.toString(), flushResult.isFlushSucceeded());
 
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
@@ -217,7 +219,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -606,6 +608,21 @@
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class LocalRegionServerServices extends MockRegionServerServices {
     private final ServerName serverName;
 
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 8dfce32..0ec3b46 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -62,6 +62,7 @@
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -160,7 +161,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -175,7 +176,8 @@
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
       LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
+      HRegion.FlushResult flushResult = region.flushcache();
+      Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
 
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
@@ -217,7 +219,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -606,6 +608,21 @@
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class LocalRegionServerServices extends MockRegionServerServices {
     private final ServerName serverName;
 
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 9f7206d..f133735 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -160,7 +162,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -174,10 +176,10 @@
 
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
-
       LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
-      region.flushcache(true, false);
-
+      Region.FlushResult flushResult = region.flushcache(true, false);
+      Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
+      
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
       Scan scan = new Scan();
@@ -218,7 +220,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -608,6 +610,20 @@
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class LocalRegionServerServices extends MockRegionServerServices {
     private final ServerName serverName;
 
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 15842a3..4c8fa64 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -160,7 +162,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -176,7 +178,8 @@
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
 
       LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
-      region.flushcache(true, false);
+      Region.FlushResult flushResult = region.flushcache(true, false);
+      Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
 
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
@@ -218,7 +221,7 @@
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -608,6 +611,21 @@
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class LocalRegionServerServices extends MockRegionServerServices {
     private final ServerName serverName;