BP-41 Bookie Address changes tracking

- Allow the client to follow Bookie Address Changes
- If a Bookie changes network address the clients will be able to connect to the new addresess
- In order to leverage the most of this feature the Bookie must configure a fixed bookieId (see BP-41) 
- add flag enableBookieAddressTracking to enable/disable this feature, as it needs a ZK watch for every active bookie in the cluster and if you have a static cluster there is no need to invalidate the local cache and issue more ZK requests
- handle ZK session expiration events, when the ZK client is not receiving watch notifications it could miss bookie address changes, so we have to eagerly flush the whole local cache of bookie addresses


This is part of BP-41
Master ticket #2396


Reviewers: Nicolò Boschi <boschi1997@gmail.com>, Anup Ghatage <ghatage@apache.org>

This closes #2435 from eolivelli/fix/bp41-client-discovery-cache
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 663ad63..76ef7bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -168,6 +168,9 @@
     protected static final String ENABLE_BOOKIE_FAILURE_TRACKING = "enableBookieFailureTracking";
     protected static final String BOOKIE_FAILURE_HISTORY_EXPIRATION_MS = "bookieFailureHistoryExpirationMSec";
 
+    // Discovery
+    protected static final String FOLLOW_BOOKIE_ADDRESS_TRACKING = "enableBookieAddressTracking";
+
     // Names of dynamic features
     protected static final String DISABLE_ENSEMBLE_CHANGE_FEATURE_NAME = "disableEnsembleChangeFeatureName";
 
@@ -1765,6 +1768,27 @@
     }
 
     /**
+     * Whether to enable bookie address changes tracking.
+     *
+     * @return flag to enable/disable bookie address changes tracking
+     */
+    public boolean getEnableBookieAddressTracking() {
+        return getBoolean(FOLLOW_BOOKIE_ADDRESS_TRACKING, true);
+    }
+
+    /**
+     * Enable/Disable bookie address changes tracking.
+     *
+     * @param value
+     *          flag to enable/disable bookie address changes tracking
+     * @return client configuration.
+     */
+    public ClientConfiguration setEnableBookieAddressTracking(boolean value) {
+        setProperty(FOLLOW_BOOKIE_ADDRESS_TRACKING, value);
+        return this;
+    }
+
+    /**
      * Whether to enable bookie failure tracking.
      *
      * @return flag to enable/disable bookie failure tracking
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index dd9bac1..9951c45 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -182,7 +182,8 @@
     private WatchTask watchReadOnlyBookiesTask = null;
     private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache =
                                                                             new ConcurrentHashMap<>();
-
+    private final Watcher bookieServiceInfoCacheInvalidation;
+    private final boolean bookieAddressTracking;
     // registration paths
     private final String bookieRegistrationPath;
     private final String bookieAllRegistrationPath;
@@ -190,10 +191,17 @@
 
     public ZKRegistrationClient(ZooKeeper zk,
                                 String ledgersRootPath,
-                                ScheduledExecutorService scheduler) {
+                                ScheduledExecutorService scheduler,
+                                boolean bookieAddressTracking) {
         this.zk = zk;
         this.scheduler = scheduler;
-
+        // Following Bookie Network Address Changes is an expensive operation
+        // as it requires additional ZooKeeper watches
+        // we can disable this feature, in case the BK cluster has only
+        // static addresses
+        this.bookieAddressTracking = bookieAddressTracking;
+        this.bookieServiceInfoCacheInvalidation = bookieAddressTracking
+                                                    ? new BookieServiceInfoCacheInvalidationWatcher() : null;
         this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
         this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
         this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
@@ -204,6 +212,10 @@
         // no-op
     }
 
+    public boolean isBookieAddressTracking() {
+        return bookieAddressTracking;
+    }
+
     public ZooKeeper getZk() {
         return zk;
     }
@@ -243,12 +255,13 @@
      * @param bookieId
      * @return an handle to the result of the operation.
      */
-    private CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfo(BookieId bookieId) {
+    private CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsync(BookieId bookieId) {
         String pathAsWritable = bookieRegistrationPath + "/" + bookieId;
         String pathAsReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
 
         CompletableFuture<Versioned<BookieServiceInfo>> promise = new CompletableFuture<>();
-        zk.getData(pathAsWritable, null, (int rc, String path, Object o, byte[] bytes, Stat stat) -> {
+        zk.getData(pathAsWritable, bookieServiceInfoCacheInvalidation,
+                (int rc, String path, Object o, byte[] bytes, Stat stat) -> {
             if (KeeperException.Code.OK.intValue() == rc) {
                 try {
                     BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes);
@@ -265,7 +278,8 @@
                 }
             } else if (KeeperException.Code.NONODE.intValue() == rc) {
                 // not found, looking for a readonly bookie
-                zk.getData(pathAsReadonly, null, (int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> {
+                zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation,
+                        (int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> {
                     if (KeeperException.Code.OK.intValue() == rc2) {
                         try {
                             BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes2);
@@ -344,7 +358,7 @@
             for (BookieId id : bookies) {
                 // update the cache for new bookies
                 if (!bookieServiceInfoCache.containsKey(id)) {
-                    bookieInfoUpdated.add(readBookieServiceInfo(id));
+                    bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
                 }
             }
             if (bookieInfoUpdated.isEmpty()) {
@@ -447,4 +461,46 @@
         return newBookieAddrs;
     }
 
+    private static BookieId stripBookieIdFromPath(String path) {
+        final int slash = path.lastIndexOf('/');
+        if (slash >= 0) {
+            try {
+                return BookieId.parse(path.substring(slash + 1));
+            } catch (IllegalArgumentException e) {
+                log.warn("Cannot decode bookieId from {}", path, e);
+            }
+        }
+        return null;
+    }
+
+    private class BookieServiceInfoCacheInvalidationWatcher implements Watcher {
+
+        @Override
+        public void process(WatchedEvent we) {
+            log.debug("zk event {} for {} state {}", we.getType(), we.getPath(), we.getState());
+            if (we.getState() == KeeperState.Expired) {
+                log.info("zk session expired, invalidating cache");
+                bookieServiceInfoCache.clear();
+                return;
+            }
+            BookieId bookieId = stripBookieIdFromPath(we.getPath());
+            if (bookieId == null) {
+                return;
+            }
+            switch (we.getType()) {
+                case NodeDeleted:
+                    log.info("Invalidate cache for {}", bookieId);
+                    bookieServiceInfoCache.remove(bookieId);
+                    break;
+                case NodeDataChanged:
+                    log.info("refresh cache for {}", bookieId);
+                    readBookieServiceInfoAsync(bookieId);
+                    break;
+                default:
+                    log.debug("ignore cache event {} for {}", we.getType(), bookieId);
+                    break;
+            }
+        }
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
index e8eea49..df5499b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -515,7 +515,8 @@
         try (RegistrationClient regClient = new ZKRegistrationClient(
             zk,
             ledgersRootPath,
-            null
+            null,
+            false
         )) {
             if (availableNodeExists) {
                 Collection<BookieId> rwBookies = FutureUtils
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
index 99b9427..638f390 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
@@ -51,6 +51,7 @@
     ClientConfiguration clientConf;
     ScheduledExecutorService scheduler;
     RegistrationClient regClient;
+    boolean bookieAddressTracking = true;
 
     @Override
     public synchronized MetadataClientDriver initialize(ClientConfiguration conf,
@@ -70,6 +71,7 @@
         this.statsLogger = statsLogger;
         this.clientConf = conf;
         this.scheduler = scheduler;
+        this.bookieAddressTracking = conf.getEnableBookieAddressTracking();
         return this;
     }
 
@@ -79,7 +81,8 @@
             regClient = new ZKRegistrationClient(
                 zk,
                 ledgersRootPath,
-                scheduler);
+                scheduler,
+                bookieAddressTracking);
         }
         return regClient;
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieNetworkAddressChangeTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieNetworkAddressChangeTest.java
new file mode 100644
index 0000000..977eb8c
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieNetworkAddressChangeTest.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
+import org.apache.bookkeeper.client.api.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.ZKRegistrationClient;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.PortManager;
+import org.junit.Test;
+
+/**
+ * Tests of the main BookKeeper client and the BP-41 bookieAddressTracking feature.
+ */
+@Slf4j
+public class BookieNetworkAddressChangeTest extends BookKeeperClusterTestCase {
+
+    public BookieNetworkAddressChangeTest() {
+        super(1);
+        this.useUUIDasBookieId = true;
+    }
+
+    @Test
+    public void testFollowBookieAddressChange() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        try (BookKeeper bkc = BookKeeper.newBuilder(conf)
+                        .build();) {
+            long lId;
+            try (WriteHandle h = bkc
+                    .newCreateLedgerOp()
+                    .withAckQuorumSize(1)
+                    .withEnsembleSize(1)
+                    .withWriteQuorumSize(1)
+                    .withPassword(new byte[0])
+                    .execute()
+                    .get();) {
+                lId = h.getId();
+                h.append("foo".getBytes("utf-8"));
+            }
+
+            // restart bookie, change port
+            // on metadata we have a bookieId, not the network address
+            ServerConfiguration thisServerConf = new ServerConfiguration(baseConf);
+            thisServerConf.setBookiePort(PortManager.nextFreePort());
+            restartBookies(thisServerConf);
+
+            try (ReadHandle h = bkc
+                    .newOpenLedgerOp()
+                    .withLedgerId(lId)
+                    .withRecovery(true)
+                    .withPassword(new byte[0])
+                    .execute()
+                    .get()) {
+                assertEquals(0, h.getLastAddConfirmed());
+                try (LedgerEntries entries = h.read(0, 0);) {
+                    assertEquals("foo", new String(entries.getEntry(0).getEntryBytes(), "utf-8"));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testFollowBookieAddressChangeTrckingDisabled() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        conf.setEnableBookieAddressTracking(false);
+        try (BookKeeper bkc = BookKeeper.newBuilder(conf)
+                        .build();) {
+            long lId;
+            try (WriteHandle h = bkc
+                    .newCreateLedgerOp()
+                    .withAckQuorumSize(1)
+                    .withEnsembleSize(1)
+                    .withWriteQuorumSize(1)
+                    .withPassword(new byte[0])
+                    .execute()
+                    .get();) {
+                lId = h.getId();
+                h.append("foo".getBytes("utf-8"));
+            }
+
+            // restart bookie, change port
+            // on metadata we have a bookieId, not the network address
+            ServerConfiguration thisServerConf = new ServerConfiguration(baseConf);
+            thisServerConf.setBookiePort(PortManager.nextFreePort());
+            restartBookies(thisServerConf);
+
+            try (ReadHandle h = bkc
+                    .newOpenLedgerOp()
+                    .withLedgerId(lId)
+                    .withRecovery(true)
+                    .withPassword(new byte[0])
+                    .execute()
+                    .get()) {
+                try (LedgerEntries entries = h.read(0, 0);) {
+                    fail("Should not be able to connect to the bookie with Bookie Address Tracking Disabled");
+                } catch (BKBookieHandleNotAvailableException expected) {
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testFollowBookieAddressChangeZkSessionExpire() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        try (BookKeeper bkc = BookKeeper.newBuilder(conf)
+                        .build();) {
+            long lId;
+            try (WriteHandle h = bkc
+                    .newCreateLedgerOp()
+                    .withAckQuorumSize(1)
+                    .withEnsembleSize(1)
+                    .withWriteQuorumSize(1)
+                    .withPassword(new byte[0])
+                    .execute()
+                    .get();) {
+                lId = h.getId();
+                h.append("foo".getBytes("utf-8"));
+            }
+
+            log.error("expiring ZK session!");
+            // expire zk session
+            ZKRegistrationClient regClient = (ZKRegistrationClient) ((org.apache.bookkeeper.client.BookKeeper) bkc)
+                    .getMetadataClientDriver()
+                    .getRegistrationClient();
+
+            regClient.getZk().getTestable().injectSessionExpiration();
+
+            // restart bookie, change port
+            // on metadata we have a bookieId, not the network address
+            ServerConfiguration thisServerConf = new ServerConfiguration(baseConf);
+            thisServerConf.setBookiePort(PortManager.nextFreePort());
+            restartBookies(thisServerConf);
+
+            try (ReadHandle h = bkc
+                    .newOpenLedgerOp()
+                    .withLedgerId(lId)
+                    .withRecovery(true)
+                    .withPassword(new byte[0])
+                    .execute()
+                    .get()) {
+                assertEquals(0, h.getLastAddConfirmed());
+                try (LedgerEntries entries = h.read(0, 0);) {
+                    assertEquals("foo", new String(entries.getEntry(0).getEntryBytes(), "utf-8"));
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/AbstractTestZkRegistrationClient.java
similarity index 96%
rename from bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
rename to bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/AbstractTestZkRegistrationClient.java
index a8d4f1d..e09a635 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/AbstractTestZkRegistrationClient.java
@@ -83,7 +83,9 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ ZKRegistrationClient.class, ZkUtils.class })
 @Slf4j
-public class TestZkRegistrationClient extends MockZooKeeperTestCase {
+public abstract class AbstractTestZkRegistrationClient extends MockZooKeeperTestCase {
+
+
 
     @Rule
     public final TestName runtime = new TestName();
@@ -96,6 +98,13 @@
     private ScheduledExecutorService mockExecutor;
     private MockExecutorController controller;
 
+    private final boolean bookieAddressChangeTracking;
+
+    public AbstractTestZkRegistrationClient(boolean bookieAddressChangeTracking) {
+        this.bookieAddressChangeTracking = bookieAddressChangeTracking;
+    }
+
+
     @Override
     @Before
     public void setup() throws Exception {
@@ -114,8 +123,10 @@
         this.zkRegistrationClient = new ZKRegistrationClient(
             mockZk,
             ledgersPath,
-            mockExecutor
+            mockExecutor,
+            bookieAddressChangeTracking
         );
+        assertEquals(bookieAddressChangeTracking, zkRegistrationClient.isBookieAddressTracking());
     }
 
     @After
@@ -136,23 +147,23 @@
     private void prepareReadBookieServiceInfo(BookieId address, boolean readonly) throws Exception {
         if (readonly) {
             mockZkGetData(regPath + "/" + address.toString(),
-                        false,
+                        zkRegistrationClient.isBookieAddressTracking(),
                         Code.NONODE.intValue(),
                         new byte[] {},
                         new Stat());
             mockZkGetData(regReadonlyPath + "/" + address.toString(),
-                        false,
+                        zkRegistrationClient.isBookieAddressTracking(),
                         Code.OK.intValue(),
                         new byte[] {},
                         new Stat());
         } else {
             mockZkGetData(regPath + "/" + address.toString(),
-                        false,
+                        zkRegistrationClient.isBookieAddressTracking(),
                         Code.OK.intValue(),
                         new byte[] {},
                         new Stat());
             mockZkGetData(regReadonlyPath + "/" + address.toString(),
-                        false,
+                        zkRegistrationClient.isBookieAddressTracking(),
                         Code.NONODE.intValue(),
                         new byte[] {},
                         new Stat());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClientWithBookieAddressTracking.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClientWithBookieAddressTracking.java
new file mode 100644
index 0000000..f9b6de3
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClientWithBookieAddressTracking.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+/**
+ * Unit test of {@link RegistrationClient} with Bookie Address Tracking feature.
+ */
+public class TestZkRegistrationClientWithBookieAddressTracking extends AbstractTestZkRegistrationClient {
+
+    public TestZkRegistrationClientWithBookieAddressTracking() {
+        super(true);
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClientWithoutBookieAddressTracking.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClientWithoutBookieAddressTracking.java
new file mode 100644
index 0000000..39ed8ea
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClientWithoutBookieAddressTracking.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.bookkeeper.discover;
+
+/**
+ * Unit test of {@link RegistrationClient} without Bookie Address Tracking feature.
+ */
+public class TestZkRegistrationClientWithoutBookieAddressTracking extends AbstractTestZkRegistrationClient {
+
+    public TestZkRegistrationClientWithoutBookieAddressTracking() {
+        super(false);
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java
index 626a055..75aadef 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java
@@ -21,6 +21,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -72,8 +73,8 @@
         ZKRegistrationClient mockRegClient = PowerMockito.mock(ZKRegistrationClient.class);
 
         PowerMockito.whenNew(ZKRegistrationClient.class)
-            .withParameterTypes(ZooKeeper.class, String.class, ScheduledExecutorService.class)
-            .withArguments(any(ZooKeeper.class), anyString(), any(ScheduledExecutorService.class))
+            .withParameterTypes(ZooKeeper.class, String.class, ScheduledExecutorService.class, Boolean.TYPE)
+            .withArguments(any(ZooKeeper.class), anyString(), any(ScheduledExecutorService.class), anyBoolean())
             .thenReturn(mockRegClient);
 
         RegistrationClient client = driver.getRegistrationClient();
@@ -81,7 +82,7 @@
         assertSame(mockRegClient, driver.regClient);
 
         PowerMockito.verifyNew(ZKRegistrationClient.class, times(1))
-            .withArguments(eq(mockZkc), eq(ledgersRootPath), eq(mockExecutor));
+            .withArguments(eq(mockZkc), eq(ledgersRootPath), eq(mockExecutor), anyBoolean());
 
         driver.close();
         verify(mockRegClient, times(1)).close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 2156859..a52a304 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -614,12 +614,17 @@
         Thread.sleep(1000);
         // restart them to ensure we can't
         for (ServerConfiguration conf : bsConfs) {
-            // ensure the bookie port is loaded correctly
+            String bookieId = conf.getBookieId();
+            // ensure the bookie id or port is loaded correctly
             int port = conf.getBookiePort();
             if (null != newConf) {
                 conf.loadConf(newConf);
             }
-            conf.setBookiePort(port);
+            if (bookieId != null) {
+                conf.setBookieId(bookieId);
+            } else {
+                conf.setBookiePort(port);
+            }
             bs.add(startBookie(conf));
         }
     }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
index f1cda91..03bf23d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
@@ -30,6 +30,7 @@
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.discover.ZKRegistrationClient;
@@ -52,6 +53,7 @@
     private final RetryPolicy bkZkRetryPolicy;
     private final String regPath;
     private final ScheduledExecutorService regExecutor;
+    private final boolean bookieAddresschangeTracking;
     private ZooKeeperClient zkClient;
     private RegistrationClient client;
 
@@ -67,6 +69,8 @@
             Integer.MAX_VALUE);
         this.regExecutor = Executors.newSingleThreadScheduledExecutor(
             new ThreadFactoryBuilder().setNameFormat("registration-service-provider-scheduler").build());
+        ClientConfiguration clientConfiguration = new ClientConfiguration(bkServerConf);
+        this.bookieAddresschangeTracking = clientConfiguration.getEnableBookieAddressTracking();
     }
 
     @Override
@@ -101,7 +105,7 @@
                 log.error("Failed to create zookeeper client to {}", zkServers, e);
                 throw e;
             }
-            client = new ZKRegistrationClient(zkClient, regPath, regExecutor);
+            client = new ZKRegistrationClient(zkClient, regPath, regExecutor, bookieAddresschangeTracking);
         }
     }