CURATOR-544: SessionFailedRetryPolicy
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 1720290..070d9b3 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -20,7 +20,6 @@
 
 import org.apache.curator.connection.ThreadLocalRetryLoop;
 import org.apache.curator.utils.ThreadUtils;
-import org.apache.zookeeper.KeeperException;
 import java.util.concurrent.Callable;
 
 /**
@@ -122,37 +121,6 @@
     public abstract void markComplete();
 
     /**
-     * Utility - return true if the given Zookeeper result code is retry-able
-     *
-     * @param rc result code
-     * @return true/false
-     */
-    public static boolean shouldRetry(int rc)
-    {
-        return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
-            (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
-            (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
-            (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) ||
-            (rc == -13); // KeeperException.Code.NEWCONFIGNOQUORUM.intValue()) - using hard coded value for ZK 3.4.x compatibility
-    }
-
-    /**
-     * Utility - return true if the given exception is retry-able
-     *
-     * @param exception exception to check
-     * @return true/false
-     */
-    public static boolean isRetryException(Throwable exception)
-    {
-        if ( exception instanceof KeeperException )
-        {
-            KeeperException keeperException = (KeeperException)exception;
-            return shouldRetry(keeperException.code().intValue());
-        }
-        return false;
-    }
-
-    /**
      * Pass any caught exceptions here
      *
      * @param exception the exception
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
index bc1c244..d987f9f 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
@@ -66,7 +66,7 @@
     public void takeException(Exception exception) throws Exception
     {
         boolean rethrow = true;
-        if ( RetryLoop.isRetryException(exception) )
+        if ( retryPolicy.allowRetry(exception) )
         {
             if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
             {
diff --git a/curator-client/src/main/java/org/apache/curator/RetryPolicy.java b/curator-client/src/main/java/org/apache/curator/RetryPolicy.java
index 6fca7e4..49f2e88 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryPolicy.java
@@ -18,6 +18,8 @@
  */
 package org.apache.curator;
 
+import org.apache.zookeeper.KeeperException;
+
 /**
  * Abstracts the policy to use when retrying connections
  */
@@ -33,5 +35,26 @@
      * @param sleeper use this to sleep - DO NOT call Thread.sleep
      * @return true/false
      */
-    public boolean      allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
+    boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
+
+    /**
+     * Called when an operation has failed with a specific exception. This method
+     * should return true to make another attempt.
+     *
+     * @param exception the cause that this operation failed
+     * @return true/false
+     */
+    default boolean allowRetry(Throwable exception)
+    {
+        if ( exception instanceof KeeperException)
+        {
+            final int rc = ((KeeperException) exception).code().intValue();
+            return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
+                    (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
+                    (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
+                    (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) ||
+                    (rc == -13); // KeeperException.Code.NEWCONFIGNOQUORUM.intValue()) - using hard coded value for ZK 3.4.x compatibility
+        }
+        return false;
+    }
 }
diff --git a/curator-client/src/main/java/org/apache/curator/SessionFailedRetryPolicy.java b/curator-client/src/main/java/org/apache/curator/SessionFailedRetryPolicy.java
new file mode 100644
index 0000000..77ad7be
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/SessionFailedRetryPolicy.java
@@ -0,0 +1,36 @@
+package org.apache.curator;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * {@link RetryPolicy} implementation that failed on session expired.
+ */
+public class SessionFailedRetryPolicy implements RetryPolicy
+{
+
+    private final RetryPolicy delegatePolicy;
+
+    public SessionFailedRetryPolicy(RetryPolicy delegatePolicy)
+    {
+        this.delegatePolicy = delegatePolicy;
+    }
+
+    @Override
+    public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
+    {
+        return delegatePolicy.allowRetry(retryCount, elapsedTimeMs, sleeper);
+    }
+
+    @Override
+    public boolean allowRetry(Throwable exception)
+    {
+        if ( exception instanceof KeeperException.SessionExpiredException )
+        {
+            return false;
+        }
+        else
+        {
+            return delegatePolicy.allowRetry(exception);
+        }
+    }
+}
diff --git a/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java b/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java
index 17bb91e..0922dff 100644
--- a/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java
+++ b/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java
@@ -22,7 +22,9 @@
 import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -162,4 +164,55 @@
             Mockito.verify(sleeper, times(i + 1)).sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
         }
     }
+
+    @Test
+    public void testRetryForeverWithSessionFailed() throws Exception
+    {
+        final Timing timing = new Timing();
+        final RetryPolicy retryPolicy = new SessionFailedRetryPolicy(new RetryForever(1000));
+        final CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), timing.session(), timing.connection(), null, retryPolicy);
+        client.start();
+
+        try
+        {
+            int loopCount = 0;
+            final RetryLoop retryLoop = client.newRetryLoop();
+            while ( retryLoop.shouldContinue()  )
+            {
+                if ( ++loopCount > 1 )
+                {
+                    break;
+                }
+
+                try
+                {
+                    client.getZooKeeper().getTestable().injectSessionExpiration();
+                    client.getZooKeeper().create("/test", new byte[]{1,2,3}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    retryLoop.markComplete();
+                }
+                catch ( Exception e )
+                {
+                    retryLoop.takeException(e);
+                }
+            }
+
+            Assert.fail("Should failed with SessionExpiredException.");
+        }
+        catch ( Exception e )
+        {
+            if ( e instanceof KeeperException )
+            {
+                int rc = ((KeeperException) e).code().intValue();
+                Assert.assertEquals(rc, KeeperException.Code.SESSIONEXPIRED.intValue());
+            }
+            else
+            {
+                throw e;
+            }
+        }
+        finally
+        {
+            client.close();
+        }
+    }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index ee5c541..2ccc173 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -20,7 +20,6 @@
 package org.apache.curator.framework.imps;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.curator.RetryLoop;
@@ -36,14 +35,12 @@
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -770,7 +767,7 @@
                 }
                 catch ( KeeperException e )
                 {
-                    if ( !RetryLoop.isRetryException(e) )
+                    if ( !client.getZookeeperClient().getRetryPolicy().allowRetry(e) )
                     {
                         throw e;
                     }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1abfc28..e704f02 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -637,7 +637,8 @@
         boolean doQueueOperation = false;
         do
         {
-            if ( RetryLoop.shouldRetry(event.getResultCode()) )
+            final KeeperException ke = KeeperException.create(event.getResultCode());
+            if ( getZookeeperClient().getRetryPolicy().allowRetry(ke) )
             {
                 doQueueOperation = checkBackgroundRetry(operationAndData, event);
                 break;
@@ -901,7 +902,7 @@
     {
         do
         {
-            if ( (operationAndData != null) && RetryLoop.isRetryException(e) )
+            if ( (operationAndData != null) && getZookeeperClient().getRetryPolicy().allowRetry(e) )
             {
                 if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
                 {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 4deaf70..3cc54cd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -300,7 +300,7 @@
         {
             ThreadUtils.checkInterrupted(e);
             //Only retry a guaranteed delete if it's a retryable error
-            if( (RetryLoop.isRetryException(e) || (e instanceof InterruptedException)) && guaranteed )
+            if ( (client.getZookeeperClient().getRetryPolicy().allowRetry(e) || (e instanceof InterruptedException)) && guaranteed )
             {
                 client.getFailedDeleteManager().addFailedOperation(unfixedPath);
             }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index 961d5f0..3c2c35d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -270,13 +270,13 @@
                             }

                             catch(Exception e)

                             {

-                                if( RetryLoop.isRetryException(e) && guaranteed )

+                                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(e) && guaranteed )

                                 {

                                     //Setup the guaranteed handler

                                     client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, finalNamespaceWatcher));

                                     throw e;

                                 }

-                                else if(e instanceof KeeperException.NoWatcherException && quietly)

+                                else if (e instanceof KeeperException.NoWatcherException && quietly)

                                 {

                                     // ignore

                                 }

@@ -349,4 +349,4 @@
             backgrounding.checkError(e, null);

         }

     }

-}
\ No newline at end of file
+}