Introduce new retry policy for metadata operations (#2470)

Descriptions of the changes in this PR:

### Motivation
Current retry backoff policy is truly exponential, hence retries are spaced out at large intervals. There is maxBackOff time that limits it, but it doesn't have a concept of deadline. Hence, applications with shorter SLA's have issues dealing with ZK failures and recovering from it.

### Changes

The new policy introduces an aggressive retry mechanism with the backoff constants as less exponential. The deadline ensures that we retry at least once at the deadline. The stack trace would indicate the operation at risk.

This doesn't update the product code, which is still in testing. I am splitting PR's for for sake of reviews.


* Introduce new retry policy for metadata operations

* Spotbugs error fix
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackOffWithDeadlinePolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackOffWithDeadlinePolicy.java
new file mode 100644
index 0000000..8fa4001
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackOffWithDeadlinePolicy.java
@@ -0,0 +1,73 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import java.util.Arrays;
+import java.util.Random;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Backoff time determined based as a multiple of baseBackoffTime.
+ * The multiple value depends on retryCount.
+ * If the retry schedule exceeds the deadline, we schedule a final attempt exactly at the deadline.
+ */
+@Slf4j
+public class ExponentialBackOffWithDeadlinePolicy implements RetryPolicy {
+
+    static final int [] RETRY_BACKOFF = {0, 1, 2, 3, 5, 5, 5, 10, 10, 10, 20, 40, 100};
+    public static final int JITTER_PERCENT = 10;
+    private final Random random;
+
+    private final long baseBackoffTime;
+    private final long deadline;
+    private final int maxRetries;
+
+    public ExponentialBackOffWithDeadlinePolicy(long baseBackoffTime, long deadline, int maxRetries) {
+        this.baseBackoffTime = baseBackoffTime;
+        this.deadline = deadline;
+        this.maxRetries = maxRetries;
+        this.random = new Random(System.currentTimeMillis());
+    }
+
+    @Override
+    public boolean allowRetry(int retryCount, long elapsedRetryTime) {
+        return retryCount <= maxRetries && elapsedRetryTime < deadline;
+    }
+
+    @Override
+    public long nextRetryWaitTime(int retryCount, long elapsedRetryTime) {
+        int idx = retryCount;
+        if (idx >= RETRY_BACKOFF.length) {
+            idx = RETRY_BACKOFF.length - 1;
+        }
+
+        long waitTime = (baseBackoffTime * RETRY_BACKOFF[idx]);
+        long jitter = (random.nextInt(JITTER_PERCENT) * waitTime / 100);
+
+        if (elapsedRetryTime + waitTime + jitter > deadline) {
+            log.warn("Final retry attempt: {}, timeleft: {}, stacktrace: {}",
+                    retryCount, (deadline - elapsedRetryTime), Arrays.toString(Thread.currentThread().getStackTrace()));
+            return deadline - elapsedRetryTime;
+        }
+
+        return waitTime + jitter;
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestRetryPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestRetryPolicy.java
index 2ed3eee..a1ff3f0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestRetryPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestRetryPolicy.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.zookeeper;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.junit.Test;
@@ -55,4 +57,32 @@
         assertTimeRange(policy.nextRetryWaitTime(128, 2000), 1000L, 2000L);
         assertTimeRange(policy.nextRetryWaitTime(129, 2000), 1000L, 2000L);
     }
+
+    @Test
+    public void testExponentialBackoffWithDeadlineRetryPolicy() throws Exception {
+        RetryPolicy policy = new ExponentialBackOffWithDeadlinePolicy(100, 55 * 1000, 20);
+
+        // Retries are allowed as long as we don't exceed the limits of retry count and deadline
+        assertTrue(policy.allowRetry(1, 5 * 1000));
+        assertTrue(policy.allowRetry(4, 20 * 1000));
+        assertTrue(policy.allowRetry(10, 50 * 1000));
+
+        assertFalse(policy.allowRetry(0, 60 * 1000));
+        assertFalse(policy.allowRetry(22, 20 * 1000));
+        assertFalse(policy.allowRetry(22, 60 * 1000));
+
+        // Verify that the wait times are in the range and with the excepted jitter, until deadline is exceeded
+        assertTimeRange(policy.nextRetryWaitTime(0, 0), 0, 0);
+        assertTimeRange(policy.nextRetryWaitTime(1, 0), 100, 110);
+        assertTimeRange(policy.nextRetryWaitTime(1, 53 * 1000), 100, 110);
+        assertTimeRange(policy.nextRetryWaitTime(2, 0), 200, 220);
+        assertTimeRange(policy.nextRetryWaitTime(3, 0), 300, 330);
+        assertTimeRange(policy.nextRetryWaitTime(3, 53 * 1000), 300, 330);
+        assertTimeRange(policy.nextRetryWaitTime(4, 0), 500, 550);
+        assertTimeRange(policy.nextRetryWaitTime(5, 0), 500, 550);
+
+        // Verify that the final attempt is triggered at deadline.
+        assertEquals(2000, policy.nextRetryWaitTime(10, 53 * 1000));
+        assertEquals(4000, policy.nextRetryWaitTime(15, 51 * 1000));
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZKClientBoundExpBackoffRP.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZKClientBoundExpBackoffRP.java
new file mode 100644
index 0000000..514c48d
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZKClientBoundExpBackoffRP.java
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.bookkeeper.test.ZooKeeperCluster;
+import org.apache.bookkeeper.test.ZooKeeperClusterUtil;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.zookeeper.KeeperException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test zk client resiliency with BoundExponentialBackoffRetryPolicy.
+ */
+@RunWith(Parameterized.class)
+public class TestZKClientBoundExpBackoffRP extends TestZooKeeperClient {
+
+    public TestZKClientBoundExpBackoffRP(Class<? extends ZooKeeperCluster> zooKeeperUtilClass,
+                                         Class<? extends RetryPolicy> retryPolicyClass)
+            throws IOException, KeeperException, InterruptedException {
+        super(zooKeeperUtilClass, retryPolicyClass);
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> zooKeeperUtilClass() {
+        return Arrays.asList(new Object[][] { { ZooKeeperUtil.class, BoundExponentialBackoffRetryPolicy.class },
+                { ZooKeeperClusterUtil.class, BoundExponentialBackoffRetryPolicy.class } });
+    }
+
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZKClientExpBackoffWithDeadlineRP.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZKClientExpBackoffWithDeadlineRP.java
new file mode 100644
index 0000000..029ad6d
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZKClientExpBackoffWithDeadlineRP.java
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.bookkeeper.test.ZooKeeperCluster;
+import org.apache.bookkeeper.test.ZooKeeperClusterUtil;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.zookeeper.KeeperException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test zk client resiliency with ExponentialBackOffWithDeadlinePolicy.
+ */
+@RunWith(Parameterized.class)
+public class TestZKClientExpBackoffWithDeadlineRP extends TestZooKeeperClient {
+
+    public TestZKClientExpBackoffWithDeadlineRP(Class<? extends ZooKeeperCluster> zooKeeperUtilClass,
+                                                Class<? extends RetryPolicy> retryPolicyClass)
+            throws IOException, KeeperException, InterruptedException {
+        super(zooKeeperUtilClass, retryPolicyClass);
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> zooKeeperUtilClass() {
+        return Arrays.asList(new Object[][] { { ZooKeeperUtil.class, ExponentialBackOffWithDeadlinePolicy.class },
+                { ZooKeeperClusterUtil.class, ExponentialBackOffWithDeadlinePolicy.class } });
+    }
+
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
index fca65ba..b0b59a6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
@@ -22,8 +22,6 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -60,7 +58,6 @@
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,7 +65,7 @@
  * Test the wrapper of {@link org.apache.zookeeper.ZooKeeper} client.
  */
 @RunWith(Parameterized.class)
-public class TestZooKeeperClient extends TestCase {
+public abstract class TestZooKeeperClient extends TestCase {
 
     static {
         ZooKeeperClusterUtil.enableZookeeperTestEnvVariables();
@@ -78,19 +75,23 @@
 
     // ZooKeeper related variables
     protected ZooKeeperCluster zkUtil;
+    private RetryPolicy retryPolicy;
 
-    @Parameters
-    public static Collection<Object[]> zooKeeperUtilClass() {
-        return Arrays.asList(new Object[][] { { ZooKeeperUtil.class }, { ZooKeeperClusterUtil.class } });
-    }
-
-    public TestZooKeeperClient(Class<? extends ZooKeeperCluster> zooKeeperUtilClass)
+    public TestZooKeeperClient(Class<? extends ZooKeeperCluster> zooKeeperUtilClass,
+                               Class<? extends RetryPolicy> retryPolicyClass)
             throws IOException, KeeperException, InterruptedException {
         if (zooKeeperUtilClass.equals(ZooKeeperUtil.class)) {
             zkUtil = new ZooKeeperUtil();
         } else {
             zkUtil = new ZooKeeperClusterUtil(3);
         }
+        if (retryPolicyClass.equals(BoundExponentialBackoffRetryPolicy.class)) {
+            retryPolicy = new BoundExponentialBackoffRetryPolicy(2000,
+                    2000, Integer.MAX_VALUE);
+        } else {
+            retryPolicy = new ExponentialBackOffWithDeadlinePolicy(100,
+                    20 * 1000, Integer.MAX_VALUE);
+        }
     }
 
     @Before
@@ -177,8 +178,9 @@
         watchers.add(testWatcher);
         ZooKeeperClient client = new ShutdownZkServerClient(
                 zkUtil.getZooKeeperConnectString(), timeout, watcherManager,
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, 0)
-                );
+                ((retryPolicy instanceof BoundExponentialBackoffRetryPolicy)
+                        ? new BoundExponentialBackoffRetryPolicy(timeout, timeout, 0) :
+                        new ExponentialBackOffWithDeadlinePolicy(100, 20 * 1000, 0)));
         client.waitForConnection();
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.",
                 client.getState().isConnected());
@@ -220,9 +222,7 @@
     public void testRetrySyncOperations() throws Exception {
         final int timeout = 2000;
         ZooKeeperClient client = ZooKeeperClient.createConnectedZooKeeperClient(
-                zkUtil.getZooKeeperConnectString(), timeout, new HashSet<Watcher>(),
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, Integer.MAX_VALUE)
-                );
+                zkUtil.getZooKeeperConnectString(), timeout, new HashSet<Watcher>(), retryPolicy);
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.",
                 client.getState().isConnected());
 
@@ -269,8 +269,7 @@
     public void testSyncAfterSessionExpiry() throws Exception {
         final int timeout = 2000;
         ZooKeeperClient client = ZooKeeperClient.createConnectedZooKeeperClient(zkUtil.getZooKeeperConnectString(),
-                timeout, new HashSet<Watcher>(),
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, Integer.MAX_VALUE));
+                timeout, new HashSet<Watcher>(), retryPolicy);
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.", client.getState().isConnected());
 
         String path = "/testSyncAfterSessionExpiry";
@@ -313,8 +312,7 @@
     public void testACLSetAndGet() throws Exception {
         final int timeout = 2000;
         ZooKeeperClient client = ZooKeeperClient.createConnectedZooKeeperClient(zkUtil.getZooKeeperConnectString(),
-                timeout, new HashSet<Watcher>(),
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, Integer.MAX_VALUE));
+                timeout, new HashSet<Watcher>(), retryPolicy);
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.", client.getState().isConnected());
 
         String path = "/testACLSetAndGet";
@@ -388,8 +386,7 @@
     public void testACLSetAndGetAfterSessionExpiry() throws Exception {
         final int timeout = 2000;
         ZooKeeperClient client = ZooKeeperClient.createConnectedZooKeeperClient(zkUtil.getZooKeeperConnectString(),
-                timeout, new HashSet<Watcher>(),
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, Integer.MAX_VALUE));
+                timeout, new HashSet<Watcher>(), retryPolicy);
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.", client.getState().isConnected());
 
         String path = "/testACLSetAndGetAfterSessionExpiry";
@@ -478,8 +475,7 @@
     public void testZnodeExists() throws Exception {
         final int timeout = 2000;
         ZooKeeperClient client = ZooKeeperClient.createConnectedZooKeeperClient(zkUtil.getZooKeeperConnectString(),
-                timeout, new HashSet<Watcher>(),
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, Integer.MAX_VALUE));
+                timeout, new HashSet<Watcher>(), retryPolicy);
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.", client.getState().isConnected());
 
         String path = "/testZnodeExists";
@@ -535,8 +531,7 @@
     public void testGetSetData() throws Exception {
         final int timeout = 2000;
         ZooKeeperClient client = ZooKeeperClient.createConnectedZooKeeperClient(zkUtil.getZooKeeperConnectString(),
-                timeout, new HashSet<Watcher>(),
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, Integer.MAX_VALUE));
+                timeout, new HashSet<Watcher>(), retryPolicy);
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.", client.getState().isConnected());
 
         String path = "/testGetSetData";
@@ -618,8 +613,7 @@
     public void testGetChildren() throws Exception {
         final int timeout = 2000;
         ZooKeeperClient client = ZooKeeperClient.createConnectedZooKeeperClient(zkUtil.getZooKeeperConnectString(),
-                timeout, new HashSet<Watcher>(),
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, Integer.MAX_VALUE));
+                timeout, new HashSet<Watcher>(), retryPolicy);
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.", client.getState().isConnected());
 
         // create a root node
@@ -764,9 +758,7 @@
     public void testRetryOnCreatingEphemeralZnode() throws Exception {
         final int timeout = 2000;
         ZooKeeperClient client = ZooKeeperClient.createConnectedZooKeeperClient(
-                zkUtil.getZooKeeperConnectString(), timeout, new HashSet<Watcher>(),
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, Integer.MAX_VALUE)
-                );
+                zkUtil.getZooKeeperConnectString(), timeout, new HashSet<Watcher>(), retryPolicy);
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.",
                 client.getState().isConnected());
 
@@ -787,9 +779,7 @@
     public void testRetryAsyncOperations() throws Exception {
         final int timeout = 2000;
         ZooKeeperClient client = ZooKeeperClient.createConnectedZooKeeperClient(
-                zkUtil.getZooKeeperConnectString(), timeout, new HashSet<Watcher>(),
-                new BoundExponentialBackoffRetryPolicy(timeout, timeout, Integer.MAX_VALUE)
-                );
+                zkUtil.getZooKeeperConnectString(), timeout, new HashSet<Watcher>(), retryPolicy);
         Assert.assertTrue("Client failed to connect an alive ZooKeeper.",
                 client.getState().isConnected());