[bookie-server] add support of client auto cert refresh

### Motivation
as described at: https://github.com/apache/pulsar/issues/6010

Bookkeeper-client caches the tls certificates when it first tries to create a cnx with a given bookie and after that it never reloads certs even when valid certs changes on the file-system or new bookie-connection is created. Because of that as soon as client certs expires and bk-client disconnects from bookie then bk-client is not able to reconnect to bookie until we restart the bk-client process. and we see below TLS exception at bk-client.

```
19:43:03.983 [bookkeeper-io-12-45] ERROR org.apache.bookkeeper.proto.PerChannelBookieClient - Unexpected exception caught by bookie client channel handler
io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: General OpenSslEngine problem
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:472) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:433) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: javax.net.ssl.SSLHandshakeException: General OpenSslEngine problem
        at io.netty.handler.ssl.ReferenceCountedOpenSslClientContext$OpenSslClientCertificateCallback.handle(ReferenceCountedOpenSslClientContext.java:273) ~[netty-all-4.1.32.Final.jar
:4.1.32.Final]
        at io.netty.internal.tcnative.SSL.readFromSSL(Native Method) ~[netty-tcnative-boringssl-static-2.0.20.Final.jar:2.0.20.Final]
        at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.readPlaintextData(ReferenceCountedOpenSslEngine.java:575) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap(ReferenceCountedOpenSslEngine.java:1124) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap(ReferenceCountedOpenSslEngine.java:1236) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap(ReferenceCountedOpenSslEngine.java:1279) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler$SslEngineType$1.unwrap(SslHandler.java:217) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1301) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1215) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1249) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
        ... 14 more
```

### Modification
Add support at bk-client to reload certs once they have changed on file-system.

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Jia Zhai <zhaijia@apache.org>, Sijie Guo <None>

This closes #2235 from rdhabalia/bk_client_cert
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
index 77e8dc9..b0bc57f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
@@ -83,8 +83,8 @@
     private volatile SslContext sslContext;
     private ByteBufAllocator allocator;
     private AbstractConfiguration config;
-    private FileModifiedTimeUpdater tTLSCertificatePath, tLSKeyStoreFilePath, tLSKeyStorePasswordFilePath,
-            tLSTrustStoreFilePath, tLSTrustStorePasswordFilePath;
+    private FileModifiedTimeUpdater tlsCertificateFilePath, tlsKeyStoreFilePath, tlsKeyStorePasswordFilePath,
+            tlsTrustStoreFilePath, tlsTrustStorePasswordFilePath;
     private long certRefreshTime;
     private volatile long certLastRefreshTime;
     private boolean isServerCtx;
@@ -183,6 +183,16 @@
     private void createClientContext()
             throws SecurityException, KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException,
             UnrecoverableKeyException, InvalidKeySpecException, NoSuchProviderException {
+        ClientConfiguration clientConf = (ClientConfiguration) config;
+        markAutoCertRefresh(clientConf.getTLSCertificatePath(), clientConf.getTLSKeyStore(),
+                clientConf.getTLSKeyStorePasswordPath(), clientConf.getTLSTrustStore(),
+                clientConf.getTLSTrustStorePasswordPath());
+        updateClientContext();
+    }
+
+    private synchronized void updateClientContext()
+            throws SecurityException, KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException,
+            UnrecoverableKeyException, InvalidKeySpecException, NoSuchProviderException {
         final SslContextBuilder sslContextBuilder;
         final ClientConfiguration clientConf;
         final SslProvider provider;
@@ -267,29 +277,35 @@
         }
 
         sslContext = sslContextBuilder.build();
+        certLastRefreshTime = System.currentTimeMillis();
     }
 
     private void createServerContext()
             throws SecurityException, KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException,
             UnrecoverableKeyException, InvalidKeySpecException, IllegalArgumentException {
         isServerCtx = true;
-        ServerConfiguration serverConf = (ServerConfiguration) config;
-        tTLSCertificatePath = new FileModifiedTimeUpdater(serverConf.getTLSCertificatePath());
-        tLSKeyStoreFilePath = new FileModifiedTimeUpdater(serverConf.getTLSKeyStore());
-        tLSKeyStorePasswordFilePath = new FileModifiedTimeUpdater(serverConf.getTLSKeyStorePasswordPath());
-        tLSTrustStoreFilePath = new FileModifiedTimeUpdater(serverConf.getTLSTrustStore());
-        tLSTrustStorePasswordFilePath = new FileModifiedTimeUpdater(serverConf.getTLSTrustStorePasswordPath());
+        ServerConfiguration clientConf = (ServerConfiguration) config;
+        markAutoCertRefresh(clientConf.getTLSCertificatePath(), clientConf.getTLSKeyStore(),
+                clientConf.getTLSKeyStorePasswordPath(), clientConf.getTLSTrustStore(),
+                clientConf.getTLSTrustStorePasswordPath());
         updateServerContext();
     }
 
     private synchronized SslContext getSSLContext() {
         long now = System.currentTimeMillis();
-        if (isServerCtx && (certRefreshTime > 0 && now > (certLastRefreshTime + certRefreshTime))) {
-            if (tTLSCertificatePath.checkAndRefresh() || tLSKeyStoreFilePath.checkAndRefresh()
-                    || tLSKeyStorePasswordFilePath.checkAndRefresh() || tLSTrustStoreFilePath.checkAndRefresh()
-                    || tLSTrustStorePasswordFilePath.checkAndRefresh()) {
+        if ((certRefreshTime > 0 && now > (certLastRefreshTime + certRefreshTime))) {
+            if (tlsCertificateFilePath.checkAndRefresh() || tlsKeyStoreFilePath.checkAndRefresh()
+                    || tlsKeyStorePasswordFilePath.checkAndRefresh() || tlsTrustStoreFilePath.checkAndRefresh()
+                    || tlsTrustStorePasswordFilePath.checkAndRefresh()) {
                 try {
-                    updateServerContext();
+                    LOG.info("Updating tls certs certFile={}, keyStoreFile={}, trustStoreFile={}",
+                            tlsCertificateFilePath.getFileName(), tlsKeyStoreFilePath.getFileName(),
+                            tlsTrustStoreFilePath.getFileName());
+                    if (isServerCtx) {
+                        updateServerContext();
+                    } else {
+                        updateClientContext();
+                    }
                 } catch (Exception e) {
                     LOG.info("Failed to refresh tls certs", e);
                 }
@@ -458,4 +474,13 @@
 
         return sslHandler;
     }
+
+    private void markAutoCertRefresh(String tlsCertificatePath, String tlsKeyStore, String tlsKeyStorePasswordPath,
+            String tlsTrustStore, String tlsTrustStorePasswordPath) {
+        tlsCertificateFilePath = new FileModifiedTimeUpdater(tlsCertificatePath);
+        tlsKeyStoreFilePath = new FileModifiedTimeUpdater(tlsKeyStore);
+        tlsKeyStorePasswordFilePath = new FileModifiedTimeUpdater(tlsKeyStorePasswordPath);
+        tlsTrustStoreFilePath = new FileModifiedTimeUpdater(tlsTrustStore);
+        tlsTrustStorePasswordFilePath = new FileModifiedTimeUpdater(tlsTrustStorePasswordPath);
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
index 68d8662..94dec72 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
@@ -36,6 +36,8 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.auth.AuthCallbacks;
 import org.apache.bookkeeper.auth.AuthToken;
@@ -387,6 +389,67 @@
     }
 
     /**
+     * Verify Bookkeeper-client refreshes certs at configured duration.
+     */
+    @Test
+    public void testRefreshDurationForBookkeeperClientCerts() throws Exception {
+        Assume.assumeTrue(serverKeyStoreFormat == KeyStoreType.PEM);
+
+        ClientConfiguration clientConf = new ClientConfiguration(baseClientConf);
+        String originalTlsCertFilePath = baseClientConf.getTLSCertificatePath();
+        String invalidClientCert = getResourcePath("server-cert.pem");
+        File originalTlsCertFile = new File(originalTlsCertFilePath);
+        File newTlsCertFile = IOUtils.createTempFileAndDeleteOnExit(originalTlsCertFilePath, "refresh");
+        // clean up temp file even if test fails
+        newTlsCertFile.deleteOnExit();
+        File invalidClientCertFile = new File(invalidClientCert);
+        // copy invalid cert to new temp file
+        FileUtils.copyFile(invalidClientCertFile, newTlsCertFile);
+        long refreshDurationInSec = 2;
+        clientConf.setTLSCertFilesRefreshDurationSeconds(1);
+        clientConf.setTLSCertificatePath(newTlsCertFile.getAbsolutePath());
+
+        // create a bookkeeper-client
+        try (BookKeeper client = new BookKeeper(clientConf)) {
+            byte[] testEntry = "testEntry".getBytes();
+            byte[] passwd = "testPassword".getBytes();
+            int totalAddEntries = 1;
+            CountDownLatch latch = new CountDownLatch(totalAddEntries);
+            AtomicInteger result = new AtomicInteger(-1);
+            LedgerHandle lh = client.createLedger(1, 1, DigestType.CRC32, passwd);
+
+            for (int i = 0; i <= totalAddEntries; i++) {
+                lh.asyncAddEntry(testEntry, (rc, lgh, entryId, ctx) -> {
+                    result.set(rc);
+                    latch.countDown();
+                }, null);
+            }
+            latch.await(1, TimeUnit.SECONDS);
+            Assert.assertNotEquals(result.get(), BKException.Code.OK);
+
+            // Sleep so, cert file can be refreshed
+            Thread.sleep(refreshDurationInSec * 1000 + 1000);
+
+            // copy valid key-file at given new location
+            FileUtils.copyFile(originalTlsCertFile, newTlsCertFile);
+            newTlsCertFile.setLastModified(System.currentTimeMillis() + 1000);
+            // client should be successfully able to add entries over tls
+            CountDownLatch latchWithValidCert = new CountDownLatch(totalAddEntries);
+            AtomicInteger validCertResult = new AtomicInteger(-1);
+            lh = client.createLedger(1, 1, DigestType.CRC32, passwd);
+            for (int i = 0; i <= totalAddEntries; i++) {
+                lh.asyncAddEntry(testEntry, (rc, lgh, entryId, ctx) -> {
+                    validCertResult.set(rc);
+                    latchWithValidCert.countDown();
+                }, null);
+            }
+            latchWithValidCert.await(1, TimeUnit.SECONDS);
+            Assert.assertEquals(validCertResult.get(), BKException.Code.OK);
+            newTlsCertFile.delete();
+        }
+    }
+
+    /**
      * Multiple clients, some with TLS, and some without TLS.
      */
     @Test