(TEPHRA-253) Fix flaky TransactionProcessorTest
This closes #54 from GitHub.
Signed-off-by: anew <anew@apache.org>
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 1879116..3c7d1e2 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -74,6 +74,7 @@
import org.apache.tephra.util.TxUtils;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -175,7 +176,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -190,7 +191,7 @@
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionNameAsString());
- region.flushcache();
+ region.flushcache(); // in 0.96, there is no indication of success
// now a normal scan should only return the valid rows - testing that cleanup works on flush
Scan scan = new Scan();
@@ -231,7 +232,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -620,6 +621,21 @@
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class MockRegionServerServices implements RegionServerServices {
private final Configuration hConf;
private final ZooKeeperWatcher zookeeper;
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index abe375d..b8e051b 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -80,6 +80,7 @@
import org.apache.tephra.util.TxUtils;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -180,7 +181,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -195,7 +196,8 @@
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionNameAsString());
- region.flushcache();
+ HRegion.FlushResult flushResult = region.flushcache();
+ Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
@@ -237,7 +239,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -624,6 +626,21 @@
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class MockRegionServerServices implements RegionServerServices {
private final Configuration hConf;
private final ZooKeeperWatcher zookeeper;
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index f6d8e2d..9ce30b5 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -62,6 +62,7 @@
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -160,7 +161,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -175,7 +176,8 @@
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionNameAsString());
- region.flushcache();
+ HRegion.FlushResult flushResult = region.flushcache();
+ Assert.assertTrue("Unexpected flush result: " + flushResult.toString(), flushResult.isFlushSucceeded());
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
@@ -217,7 +219,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -606,6 +608,21 @@
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class LocalRegionServerServices extends MockRegionServerServices {
private final ServerName serverName;
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 8dfce32..0ec3b46 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -62,6 +62,7 @@
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -160,7 +161,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -175,7 +176,8 @@
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionNameAsString());
- region.flushcache();
+ HRegion.FlushResult flushResult = region.flushcache();
+ Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
@@ -217,7 +219,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -606,6 +608,21 @@
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class LocalRegionServerServices extends MockRegionServerServices {
private final ServerName serverName;
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 9f7206d..f133735 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -160,7 +162,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -174,10 +176,10 @@
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
-
LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
- region.flushcache(true, false);
-
+ Region.FlushResult flushResult = region.flushcache(true, false);
+ Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
+
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
Scan scan = new Scan();
@@ -218,7 +220,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -608,6 +610,20 @@
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class LocalRegionServerServices extends MockRegionServerServices {
private final ServerName serverName;
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 15842a3..4c8fa64 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -160,7 +162,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -176,7 +178,8 @@
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
- region.flushcache(true, false);
+ Region.FlushResult flushResult = region.flushcache(true, false);
+ Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
@@ -218,7 +221,7 @@
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -608,6 +611,21 @@
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class LocalRegionServerServices extends MockRegionServerServices {
private final ServerName serverName;