CURATOR-544: SessionFailedRetryPolicy
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 1720290..070d9b3 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -20,7 +20,6 @@
import org.apache.curator.connection.ThreadLocalRetryLoop;
import org.apache.curator.utils.ThreadUtils;
-import org.apache.zookeeper.KeeperException;
import java.util.concurrent.Callable;
/**
@@ -122,37 +121,6 @@
public abstract void markComplete();
/**
- * Utility - return true if the given Zookeeper result code is retry-able
- *
- * @param rc result code
- * @return true/false
- */
- public static boolean shouldRetry(int rc)
- {
- return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
- (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
- (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
- (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) ||
- (rc == -13); // KeeperException.Code.NEWCONFIGNOQUORUM.intValue()) - using hard coded value for ZK 3.4.x compatibility
- }
-
- /**
- * Utility - return true if the given exception is retry-able
- *
- * @param exception exception to check
- * @return true/false
- */
- public static boolean isRetryException(Throwable exception)
- {
- if ( exception instanceof KeeperException )
- {
- KeeperException keeperException = (KeeperException)exception;
- return shouldRetry(keeperException.code().intValue());
- }
- return false;
- }
-
- /**
* Pass any caught exceptions here
*
* @param exception the exception
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
index bc1c244..d987f9f 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
@@ -66,7 +66,7 @@
public void takeException(Exception exception) throws Exception
{
boolean rethrow = true;
- if ( RetryLoop.isRetryException(exception) )
+ if ( retryPolicy.allowRetry(exception) )
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
diff --git a/curator-client/src/main/java/org/apache/curator/RetryPolicy.java b/curator-client/src/main/java/org/apache/curator/RetryPolicy.java
index 6fca7e4..49f2e88 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryPolicy.java
@@ -18,6 +18,8 @@
*/
package org.apache.curator;
+import org.apache.zookeeper.KeeperException;
+
/**
* Abstracts the policy to use when retrying connections
*/
@@ -33,5 +35,26 @@
* @param sleeper use this to sleep - DO NOT call Thread.sleep
* @return true/false
*/
- public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
+ boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
+
+ /**
+ * Called when an operation has failed with a specific exception. This method
+ * should return true to make another attempt.
+ *
+ * @param exception the cause that this operation failed
+ * @return true/false
+ */
+ default boolean allowRetry(Throwable exception)
+ {
+ if ( exception instanceof KeeperException)
+ {
+ final int rc = ((KeeperException) exception).code().intValue();
+ return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
+ (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
+ (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
+ (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) ||
+ (rc == -13); // KeeperException.Code.NEWCONFIGNOQUORUM.intValue()) - using hard coded value for ZK 3.4.x compatibility
+ }
+ return false;
+ }
}
diff --git a/curator-client/src/main/java/org/apache/curator/SessionFailedRetryPolicy.java b/curator-client/src/main/java/org/apache/curator/SessionFailedRetryPolicy.java
new file mode 100644
index 0000000..77ad7be
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/SessionFailedRetryPolicy.java
@@ -0,0 +1,36 @@
+package org.apache.curator;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * {@link RetryPolicy} implementation that failed on session expired.
+ */
+public class SessionFailedRetryPolicy implements RetryPolicy
+{
+
+ private final RetryPolicy delegatePolicy;
+
+ public SessionFailedRetryPolicy(RetryPolicy delegatePolicy)
+ {
+ this.delegatePolicy = delegatePolicy;
+ }
+
+ @Override
+ public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
+ {
+ return delegatePolicy.allowRetry(retryCount, elapsedTimeMs, sleeper);
+ }
+
+ @Override
+ public boolean allowRetry(Throwable exception)
+ {
+ if ( exception instanceof KeeperException.SessionExpiredException )
+ {
+ return false;
+ }
+ else
+ {
+ return delegatePolicy.allowRetry(exception);
+ }
+ }
+}
diff --git a/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java b/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java
index 17bb91e..0922dff 100644
--- a/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java
+++ b/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java
@@ -22,7 +22,9 @@
import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -162,4 +164,55 @@
Mockito.verify(sleeper, times(i + 1)).sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
}
}
+
+ @Test
+ public void testRetryForeverWithSessionFailed() throws Exception
+ {
+ final Timing timing = new Timing();
+ final RetryPolicy retryPolicy = new SessionFailedRetryPolicy(new RetryForever(1000));
+ final CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), timing.session(), timing.connection(), null, retryPolicy);
+ client.start();
+
+ try
+ {
+ int loopCount = 0;
+ final RetryLoop retryLoop = client.newRetryLoop();
+ while ( retryLoop.shouldContinue() )
+ {
+ if ( ++loopCount > 1 )
+ {
+ break;
+ }
+
+ try
+ {
+ client.getZooKeeper().getTestable().injectSessionExpiration();
+ client.getZooKeeper().create("/test", new byte[]{1,2,3}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ retryLoop.markComplete();
+ }
+ catch ( Exception e )
+ {
+ retryLoop.takeException(e);
+ }
+ }
+
+ Assert.fail("Should failed with SessionExpiredException.");
+ }
+ catch ( Exception e )
+ {
+ if ( e instanceof KeeperException )
+ {
+ int rc = ((KeeperException) e).code().intValue();
+ Assert.assertEquals(rc, KeeperException.Code.SESSIONEXPIRED.intValue());
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ finally
+ {
+ client.close();
+ }
+ }
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index ee5c541..2ccc173 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -20,7 +20,6 @@
package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.curator.RetryLoop;
@@ -36,14 +35,12 @@
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -770,7 +767,7 @@
}
catch ( KeeperException e )
{
- if ( !RetryLoop.isRetryException(e) )
+ if ( !client.getZookeeperClient().getRetryPolicy().allowRetry(e) )
{
throw e;
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1abfc28..e704f02 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -637,7 +637,8 @@
boolean doQueueOperation = false;
do
{
- if ( RetryLoop.shouldRetry(event.getResultCode()) )
+ final KeeperException ke = KeeperException.create(event.getResultCode());
+ if ( getZookeeperClient().getRetryPolicy().allowRetry(ke) )
{
doQueueOperation = checkBackgroundRetry(operationAndData, event);
break;
@@ -901,7 +902,7 @@
{
do
{
- if ( (operationAndData != null) && RetryLoop.isRetryException(e) )
+ if ( (operationAndData != null) && getZookeeperClient().getRetryPolicy().allowRetry(e) )
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 4deaf70..3cc54cd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -300,7 +300,7 @@
{
ThreadUtils.checkInterrupted(e);
//Only retry a guaranteed delete if it's a retryable error
- if( (RetryLoop.isRetryException(e) || (e instanceof InterruptedException)) && guaranteed )
+ if ( (client.getZookeeperClient().getRetryPolicy().allowRetry(e) || (e instanceof InterruptedException)) && guaranteed )
{
client.getFailedDeleteManager().addFailedOperation(unfixedPath);
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index 961d5f0..3c2c35d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -270,13 +270,13 @@
}
catch(Exception e)
{
- if( RetryLoop.isRetryException(e) && guaranteed )
+ if ( client.getZookeeperClient().getRetryPolicy().allowRetry(e) && guaranteed )
{
//Setup the guaranteed handler
client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, finalNamespaceWatcher));
throw e;
}
- else if(e instanceof KeeperException.NoWatcherException && quietly)
+ else if (e instanceof KeeperException.NoWatcherException && quietly)
{
// ignore
}
@@ -349,4 +349,4 @@
backgrounding.checkError(e, null);
}
}
-}
\ No newline at end of file
+}