[fix] [broker] Fix configurationMetadataSyncEventTopic is marked supporting dynamic setting, but not implemented  (#22684)

(cherry picked from commit ff4853e06259d2c278d76d393dd9b650ad3edf4a)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 93b7efc..2a17607 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -568,13 +568,12 @@
                 }
             }
 
-            closeLocalMetadataStore();
+            asyncCloseFutures.add(closeLocalMetadataStore());
+            if (configMetadataSynchronizer != null) {
+                asyncCloseFutures.add(configMetadataSynchronizer.closeAsync());
+            }
             if (configurationMetadataStore != null && shouldShutdownConfigurationMetadataStore) {
                 configurationMetadataStore.close();
-                if (configMetadataSynchronizer != null) {
-                    configMetadataSynchronizer.close();
-                    configMetadataSynchronizer = null;
-                }
             }
 
             if (transactionExecutorProvider != null) {
@@ -1114,14 +1113,16 @@
                         .build());
     }
 
-    protected void closeLocalMetadataStore() throws Exception {
+    protected CompletableFuture<Void> closeLocalMetadataStore() throws Exception {
         if (localMetadataStore != null) {
             localMetadataStore.close();
         }
         if (localMetadataSynchronizer != null) {
-            localMetadataSynchronizer.close();
+            CompletableFuture<Void> closeSynchronizer = localMetadataSynchronizer.closeAsync();
             localMetadataSynchronizer = null;
+            return closeSynchronizer;
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     protected void startLeaderElectionService() {
@@ -1935,4 +1936,69 @@
     protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
         return new BrokerService(pulsar, ioEventLoopGroup);
     }
+
+    public void initConfigMetadataSynchronizerIfNeeded() {
+        mutex.lock();
+        try {
+            final String newTopic = config.getConfigurationMetadataSyncEventTopic();
+            final PulsarMetadataEventSynchronizer oldSynchronizer = configMetadataSynchronizer;
+            // Skip if not support.
+            if (!(configurationMetadataStore instanceof MetadataStoreExtended)) {
+                LOG.info(
+                        "Skip to update Metadata Synchronizer because of the Configuration Metadata Store using[{}]"
+                                + " does not support.", configurationMetadataStore.getClass().getName());
+                return;
+            }
+            // Skip if no changes.
+            //   case-1: both null.
+            //   case-2: both topics are the same.
+            if ((oldSynchronizer == null && StringUtils.isBlank(newTopic))) {
+                LOG.info("Skip to update Metadata Synchronizer because the topic[null] does not changed.");
+            }
+            if (StringUtils.isNotBlank(newTopic) && oldSynchronizer != null) {
+                TopicName newTopicName = TopicName.get(newTopic);
+                TopicName oldTopicName = TopicName.get(oldSynchronizer.getTopicName());
+                if (newTopicName.equals(oldTopicName)) {
+                    LOG.info("Skip to update Metadata Synchronizer because the topic[{}] does not changed.",
+                            oldTopicName);
+                }
+            }
+            // Update(null or not null).
+            //   1.set the new one.
+            //   2.close the old one.
+            //   3.async start the new one.
+            if (StringUtils.isBlank(newTopic)) {
+                configMetadataSynchronizer = null;
+            } else {
+                configMetadataSynchronizer = new PulsarMetadataEventSynchronizer(this, newTopic);
+            }
+            // close the old one and start the new one.
+            PulsarMetadataEventSynchronizer newSynchronizer = configMetadataSynchronizer;
+            MetadataStoreExtended metadataStoreExtended = (MetadataStoreExtended) configurationMetadataStore;
+            metadataStoreExtended.updateMetadataEventSynchronizer(newSynchronizer);
+            Runnable startNewSynchronizer = () -> {
+                if (newSynchronizer == null) {
+                    return;
+                }
+                try {
+                    newSynchronizer.start();
+                } catch (Exception e) {
+                    // It only occurs when get internal client fails.
+                    LOG.error("Start Metadata Synchronizer with topic {} failed.",
+                            newTopic, e);
+                }
+            };
+            executor.submit(() -> {
+                if (oldSynchronizer != null) {
+                    oldSynchronizer.closeAsync().whenComplete((ignore, ex) -> {
+                        startNewSynchronizer.run();
+                    });
+                } else {
+                    startNewSynchronizer.run();
+                }
+            });
+        } finally {
+            mutex.unlock();
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bcfe531..2f3f256 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2827,6 +2827,11 @@
             pulsar.getWebService().updateHttpRequestsFailOnUnknownPropertiesEnabled((boolean) enabled);
         });
 
+        // add listener to notify web service httpRequestsFailOnUnknownPropertiesEnabled changed.
+        registerConfigurationListener("configurationMetadataSyncEventTopic", enabled -> {
+            pulsar.initConfigMetadataSynchronizerIfNeeded();
+        });
+
         // add more listeners here
 
         // (3) create dynamic-config if not exist.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
index 0383a0b..8b2ebf2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
@@ -19,11 +19,15 @@
 package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -46,6 +50,7 @@
     private static final Logger log = LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
     protected PulsarService pulsar;
     protected BrokerService brokerService;
+    @Getter
     protected String topicName;
     protected PulsarClientImpl client;
     protected volatile Producer<MetadataEvent> producer;
@@ -53,19 +58,32 @@
     private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>>
     listeners = new CopyOnWriteArrayList<>();
 
-    private volatile boolean started = false;
+    static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class, State.class, "state");
+    @Getter
+    private volatile State state;
     public static final String SUBSCRIPTION_NAME = "metadata-syncer";
     private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
     protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0,
             TimeUnit.MILLISECONDS);
+    private volatile CompletableFuture<Void> closeFuture;
 
-    public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) throws PulsarServerException {
+    public enum State {
+        Init,
+        Starting_Producer,
+        Starting_Consumer,
+        Started,
+        Closing,
+        Closed;
+    }
+
+    public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) {
         this.pulsar = pulsar;
         this.brokerService = pulsar.getBrokerService();
         this.topicName = topicName;
+        this.state = State.Init;
         if (!StringUtils.isNotBlank(topicName)) {
             log.info("Metadata synchronizer is disabled");
-            return;
         }
     }
 
@@ -74,10 +92,11 @@
             log.info("metadata topic doesn't exist.. skipping metadata synchronizer init..");
             return;
         }
+        log.info("Metadata event synchronizer is starting on topic {}", topicName);
         this.client = (PulsarClientImpl) pulsar.getClient();
-        startProducer();
-        startConsumer();
-        log.info("Metadata event synchronizer started on topic {}", topicName);
+        if (STATE_UPDATER.compareAndSet(this, State.Init, State.Starting_Producer)) {
+            startProducer();
+        }
     }
 
     @Override
@@ -98,7 +117,7 @@
     }
 
     private void publishAsync(MetadataEvent event, CompletableFuture<Void> future) {
-        if (!started) {
+        if (!isProducerStarted()) {
             log.info("Producer is not started on {}, failed to publish {}", topicName, event);
             future.completeExceptionally(new IllegalStateException("producer is not started yet"));
         }
@@ -114,62 +133,100 @@
     }
 
     private void startProducer() {
+        if (isClosingOrClosed()) {
+            log.info("[{}] Skip to start new producer because the synchronizer is closed", topicName);
+        }
+        if (producer != null) {
+            log.error("[{}] Failed to start the producer because the producer has been set, state: {}",
+                    topicName, state);
+            return;
+        }
         log.info("[{}] Starting producer", topicName);
         client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(topicName)
-                .messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false)
-                .sendTimeout(0, TimeUnit.SECONDS) //
-                .maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod -> {
+            .messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false)
+            .sendTimeout(0, TimeUnit.SECONDS) //
+            .maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod -> {
+                backOff.reset();
+                if (STATE_UPDATER.compareAndSet(this, State.Starting_Producer, State.Starting_Consumer)) {
                     producer = prod;
-                    started = true;
                     log.info("producer is created successfully {}", topicName);
-                }).exceptionally(ex -> {
-                    long waitTimeMs = backOff.next();
-                    log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(),
-                            waitTimeMs / 1000.0);
-                    // BackOff before retrying
-                    brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
-                    return null;
-                });
+                    PulsarMetadataEventSynchronizer.this.startConsumer();
+                } else {
+                    State stateTransient = state;
+                    log.info("[{}] Closing the new producer because the synchronizer state is {}", prod,
+                            stateTransient);
+                    CompletableFuture closeProducer = new CompletableFuture<>();
+                    closeResource(() -> prod.closeAsync(), closeProducer);
+                    closeProducer.thenRun(() -> {
+                        log.info("[{}] Closed the new producer because the synchronizer state is {}", prod,
+                                stateTransient);
+                    });
+                }
+            }).exceptionally(ex -> {
+                long waitTimeMs = backOff.next();
+                log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(),
+                        waitTimeMs / 1000.0);
+                // BackOff before retrying
+                brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
+                return null;
+            });
     }
 
     private void startConsumer() {
+        if (isClosingOrClosed()) {
+            log.info("[{}] Skip to start new consumer because the synchronizer is closed", topicName);
+        }
         if (consumer != null) {
+            log.error("[{}] Failed to start the consumer because the consumer has been set, state: {}",
+                    topicName, state);
             return;
         }
+        log.info("[{}] Starting consumer", topicName);
         ConsumerBuilder<MetadataEvent> consumerBuilder = client.newConsumer(Schema.AVRO(MetadataEvent.class))
-                .topic(topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60, TimeUnit.SECONDS)
-                .subscriptionType(SubscriptionType.Failover).messageListener((c, msg) -> {
-                    log.info("Processing metadata event for {} with listeners {}", msg.getValue().getPath(),
-                            listeners.size());
-                    try {
-                        if (listeners.size() == 0) {
-                            c.acknowledgeAsync(msg);
-                            return;
+            .topic(topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60, TimeUnit.SECONDS)
+            .subscriptionType(SubscriptionType.Failover).messageListener((c, msg) -> {
+                log.info("Processing metadata event for {} with listeners {}", msg.getValue().getPath(),
+                        listeners.size());
+                try {
+                    if (listeners.size() == 0) {
+                        c.acknowledgeAsync(msg);
+                        return;
 
-                        }
-                        if (listeners.size() == 1) {
-                            listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg))
-                                    .exceptionally(ex -> {
-                                        log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName,
-                                                ex.getCause());
-                                        return null;
-                                    });
-                        } else {
-                            FutureUtil
-                                    .waitForAll(listeners.stream().map(listener -> listener.apply(msg.getValue()))
-                                            .collect(Collectors.toList()))
-                                    .thenApply(__ -> c.acknowledgeAsync(msg)).exceptionally(ex -> {
-                                        log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName);
-                                        return null;
-                                    });
-                        }
-                    } catch (Exception e) {
-                        log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName);
                     }
-                });
+                    if (listeners.size() == 1) {
+                        listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg))
+                                .exceptionally(ex -> {
+                                    log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName,
+                                            ex.getCause());
+                                    return null;
+                                });
+                    } else {
+                        FutureUtil
+                                .waitForAll(listeners.stream().map(listener -> listener.apply(msg.getValue()))
+                                        .collect(Collectors.toList()))
+                                .thenApply(__ -> c.acknowledgeAsync(msg)).exceptionally(ex -> {
+                                    log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName);
+                                    return null;
+                                });
+                    }
+                } catch (Exception e) {
+                    log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName);
+                }
+            });
         consumerBuilder.subscribeAsync().thenAccept(consumer -> {
-            log.info("successfully created consumer {}", topicName);
-            this.consumer = consumer;
+            backOff.reset();
+            if (STATE_UPDATER.compareAndSet(this, State.Starting_Consumer, State.Started)) {
+                this.consumer = consumer;
+                log.info("successfully created consumer {}", topicName);
+            } else {
+                State stateTransient = state;
+                log.info("[{}] Closing the new consumer because the synchronizer state is {}", stateTransient);
+                CompletableFuture closeConsumer = new CompletableFuture<>();
+                closeResource(() -> consumer.closeAsync(), closeConsumer);
+                closeConsumer.thenRun(() -> {
+                    log.info("[{}] Closed the new consumer because the synchronizer state is {}", stateTransient);
+                });
+            }
         }).exceptionally(ex -> {
             long waitTimeMs = backOff.next();
             log.warn("[{}] Failed to create consumer ({}), retrying in {} s", topicName, ex.getMessage(),
@@ -181,19 +238,81 @@
     }
 
     public boolean isStarted() {
-        return started;
+        return this.state == State.Started;
+    }
+
+    public boolean isProducerStarted() {
+        return this.state.ordinal() > State.Starting_Producer.ordinal()
+                && this.state.ordinal() < State.Closing.ordinal();
+    }
+
+    public boolean isClosingOrClosed() {
+        return this.state == State.Closing || this.state == State.Closed;
     }
 
     @Override
-    public void close() {
-        started = false;
-        if (producer != null) {
-            producer.closeAsync();
-            producer = null;
+    public synchronized CompletableFuture<Void> closeAsync() {
+        int tryChangeStateCounter = 0;
+        while (true) {
+            if (isClosingOrClosed()) {
+                return closeFuture;
+            }
+            if (STATE_UPDATER.compareAndSet(this, State.Init, State.Closing)
+                || STATE_UPDATER.compareAndSet(this, State.Starting_Producer, State.Closing)
+                || STATE_UPDATER.compareAndSet(this, State.Starting_Consumer, State.Closing)
+                || STATE_UPDATER.compareAndSet(this, State.Started, State.Closing)) {
+                break;
+            }
+            // Just for avoid spinning loop which would cause 100% CPU consumption here.
+            if (++tryChangeStateCounter > 100) {
+                log.error("Unexpected error: the state can not be changed to closing {}, state: {}", topicName, state);
+                return CompletableFuture.failedFuture(new RuntimeException("Unexpected error,"
+                        + " the state can not be changed to closing"));
+            }
         }
-        if (consumer != null) {
-            consumer.closeAsync();
-            consumer = null;
+        CompletableFuture<Void> closeProducer = new CompletableFuture<>();
+        CompletableFuture<Void> closeConsumer = new CompletableFuture<>();
+        if (producer == null) {
+            closeProducer.complete(null);
+        } else {
+            closeResource(() -> producer.closeAsync(), closeProducer);
         }
+        if (consumer == null) {
+            closeConsumer.complete(null);
+        } else {
+            closeResource(() -> consumer.closeAsync(), closeConsumer);
+        }
+
+        // Add logs.
+        closeProducer.thenRun(() -> log.info("Successfully close producer {}", topicName));
+        closeConsumer.thenRun(() -> log.info("Successfully close consumer {}", topicName));
+
+        closeFuture = FutureUtil.waitForAll(Arrays.asList(closeProducer, closeConsumer));
+        closeFuture.thenRun(() -> {
+            this.state = State.Closed;
+            log.info("Successfully close metadata store synchronizer {}", topicName);
+        });
+        return closeFuture;
+    }
+
+    private void closeResource(final Supplier<CompletableFuture<Void>> asyncCloseable,
+                               final CompletableFuture<Void> future) {
+        if (asyncCloseable == null) {
+            future.complete(null);
+            return;
+        }
+        asyncCloseable.get().whenComplete((ignore, ex) -> {
+            if (ex == null) {
+                backOff.reset();
+                future.complete(null);
+                return;
+            }
+            // Retry.
+            long waitTimeMs = backOff.next();
+            log.warn("[{}] Exception: '{}' occurred while trying to close the %s. Retrying again in {} s.",
+                    topicName, ex.getMessage(), asyncCloseable.getClass().getSimpleName(), waitTimeMs / 1000.0, ex);
+            brokerService.executor().schedule(() -> closeResource(asyncCloseable, future), waitTimeMs,
+                    TimeUnit.MILLISECONDS);
+        });
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
new file mode 100644
index 0000000..9b4dd51
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+
+@Slf4j
+public abstract class GeoReplicationWithConfigurationSyncTestBase extends TestRetrySupport {
+
+    protected final String defaultTenant = "public";
+    protected final String defaultNamespace = defaultTenant + "/default";
+
+    protected final String cluster1 = "r1";
+    protected URL url1;
+    protected URL urlTls1;
+    protected ServiceConfiguration config1 = new ServiceConfiguration();
+    protected ZookeeperServerTest brokerConfigZk1;
+    protected LocalBookkeeperEnsemble bkEnsemble1;
+    protected PulsarService pulsar1;
+    protected BrokerService ns1;
+    protected PulsarAdmin admin1;
+    protected PulsarClient client1;
+
+    protected URL url2;
+    protected URL urlTls2;
+    protected final String cluster2 = "r2";
+    protected ServiceConfiguration config2 = new ServiceConfiguration();
+    protected ZookeeperServerTest brokerConfigZk2;
+    protected LocalBookkeeperEnsemble bkEnsemble2;
+    protected PulsarService pulsar2;
+    protected BrokerService ns2;
+    protected PulsarAdmin admin2;
+    protected PulsarClient client2;
+
+    protected void startZKAndBK() throws Exception {
+        // Start ZK.
+        brokerConfigZk1 = new ZookeeperServerTest(0);
+        brokerConfigZk1.start();
+        brokerConfigZk2 = new ZookeeperServerTest(0);
+        brokerConfigZk2.start();
+
+        // Start BK.
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble1.start();
+        bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble2.start();
+    }
+
+    protected void startBrokers() throws Exception {
+        // Start brokers.
+        setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1);
+        pulsar1 = new PulsarService(config1);
+        pulsar1.start();
+        ns1 = pulsar1.getBrokerService();
+
+        url1 = new URL(pulsar1.getWebServiceAddress());
+        urlTls1 = new URL(pulsar1.getWebServiceAddressTls());
+        admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
+        client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
+
+        // Start region 2
+        setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2);
+        pulsar2 = new PulsarService(config2);
+        pulsar2.start();
+        ns2 = pulsar2.getBrokerService();
+
+        url2 = new URL(pulsar2.getWebServiceAddress());
+        urlTls2 = new URL(pulsar2.getWebServiceAddressTls());
+        admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
+        client2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
+    }
+
+    protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
+        admin1.clusters().createCluster(cluster1, ClusterData.builder()
+                .serviceUrl(url1.toString())
+                .serviceUrlTls(urlTls1.toString())
+                .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+                .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+                .brokerClientTlsEnabled(false)
+                .build());
+        admin1.clusters().createCluster(cluster2, ClusterData.builder()
+                .serviceUrl(url2.toString())
+                .serviceUrlTls(urlTls2.toString())
+                .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
+                .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+                .brokerClientTlsEnabled(false)
+                .build());
+        admin2.clusters().createCluster(cluster1, ClusterData.builder()
+                .serviceUrl(url1.toString())
+                .serviceUrlTls(urlTls1.toString())
+                .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+                .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+                .brokerClientTlsEnabled(false)
+                .build());
+        admin2.clusters().createCluster(cluster2, ClusterData.builder()
+                .serviceUrl(url2.toString())
+                .serviceUrlTls(urlTls2.toString())
+                .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
+                .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+                .brokerClientTlsEnabled(false)
+                .build());
+
+        admin1.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
+                Sets.newHashSet(cluster1, cluster2)));
+        admin2.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
+                Sets.newHashSet(cluster1, cluster2)));
+
+        admin1.namespaces().createNamespace(defaultNamespace);
+        admin2.namespaces().createNamespace(defaultNamespace);
+    }
+
+    @Override
+    protected void setup() throws Exception {
+        incrementSetupNumber();
+
+        log.info("--- Starting OneWayReplicatorTestBase::setup ---");
+
+        startZKAndBK();
+
+        startBrokers();
+
+        createDefaultTenantsAndClustersAndNamespace();
+
+        Thread.sleep(100);
+        log.info("--- OneWayReplicatorTestBase::setup completed ---");
+    }
+
+    protected void setConfigDefaults(ServiceConfiguration config, String clusterName,
+                                   LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+        config.setClusterName(clusterName);
+        config.setAdvertisedAddress("localhost");
+        config.setWebServicePort(Optional.of(0));
+        config.setWebServicePortTls(Optional.of(0));
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort());
+        config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + brokerConfigZk.getZookeeperPort() + "/foo");
+        config.setBrokerDeleteInactiveTopicsEnabled(false);
+        config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
+        config.setBrokerShutdownTimeoutMs(0L);
+        config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        config.setBrokerServicePort(Optional.of(0));
+        config.setBrokerServicePortTls(Optional.of(0));
+        config.setBacklogQuotaCheckIntervalInSeconds(5);
+        config.setDefaultNumberOfNamespaceBundles(1);
+        config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+        config.setEnableReplicatedSubscriptions(true);
+        config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+        config.setLoadBalancerSheddingEnabled(false);
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        // shutdown.
+        markCurrentSetupNumberCleaned();
+        log.info("--- Shutting down ---");
+
+        // Stop brokers.
+        if (client1 != null) {
+            client1.close();
+            client1 = null;
+        }
+        if (client2 != null) {
+            client2.close();
+            client2 = null;
+        }
+        if (admin1 != null) {
+            admin1.close();
+            admin1 = null;
+        }
+        if (admin2 != null) {
+            admin2.close();
+            admin2 = null;
+        }
+        if (pulsar2 != null) {
+            pulsar2.close();
+            pulsar2 = null;
+        }
+        if (pulsar1 != null) {
+            pulsar1.close();
+            pulsar1 = null;
+        }
+
+        // Stop ZK and BK.
+        if (bkEnsemble1 != null) {
+            bkEnsemble1.stop();
+            bkEnsemble1 = null;
+        }
+        if (bkEnsemble2 != null) {
+            bkEnsemble2.stop();
+            bkEnsemble2 = null;
+        }
+        if (brokerConfigZk1 != null) {
+            brokerConfigZk1.stop();
+            brokerConfigZk1 = null;
+        }
+        if (brokerConfigZk2 != null) {
+            brokerConfigZk2.stop();
+            brokerConfigZk2 = null;
+        }
+
+        // Reset configs.
+        config1 = new ServiceConfiguration();
+        config2 = new ServiceConfiguration();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java
new file mode 100644
index 0000000..577725f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Arrays;
+import java.util.HashSet;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class SyncConfigStoreTest extends GeoReplicationWithConfigurationSyncTestBase {
+
+    private static final String CONF_NAME_SYNC_EVENT_TOPIC = "configurationMetadataSyncEventTopic";
+    private static final String SYNC_EVENT_TOPIC = TopicDomain.persistent.value() + "://" + SYSTEM_NAMESPACE
+            + "/__sync_config_meta";
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+        TenantInfoImpl tenantInfo = new TenantInfoImpl();
+        tenantInfo.setAllowedClusters(new HashSet<>(Arrays.asList(cluster1, cluster2)));
+        admin1.tenants().createTenant(TopicName.get(SYNC_EVENT_TOPIC).getTenant(), tenantInfo);
+        admin1.namespaces().createNamespace(TopicName.get(SYNC_EVENT_TOPIC).getNamespace());
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    protected void setConfigDefaults(ServiceConfiguration config, String clusterName,
+                                     LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+        super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk);
+    }
+
+    @Test
+    public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception {
+        // Verify the condition that supports synchronizer: the metadata store is a different one.
+        Awaitility.await().untilAsserted(() -> {
+            boolean shouldShutdownConfigurationMetadataStore =
+                    WhiteboxImpl.getInternalState(pulsar1, "shouldShutdownConfigurationMetadataStore");
+            assertTrue(shouldShutdownConfigurationMetadataStore);
+        });
+
+        // Verify the synchronizer will be created dynamically.
+        admin1.brokers().updateDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC, SYNC_EVENT_TOPIC);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(pulsar1.getConfig().getConfigurationMetadataSyncEventTopic(), SYNC_EVENT_TOPIC);
+            PulsarMetadataEventSynchronizer synchronizer =
+                    WhiteboxImpl.getInternalState(pulsar1, "configMetadataSynchronizer");
+            assertNotNull(synchronizer);
+            assertEquals(synchronizer.getState(), PulsarMetadataEventSynchronizer.State.Started);
+            assertTrue(synchronizer.isStarted());
+        });
+
+        PulsarMetadataEventSynchronizer synchronizerStarted =
+                WhiteboxImpl.getInternalState(pulsar1, "configMetadataSynchronizer");
+        Producer<MetadataEvent> producerStarted =
+                WhiteboxImpl.getInternalState(synchronizerStarted, "producer");
+        Consumer<MetadataEvent> consumerStarted =
+                WhiteboxImpl.getInternalState(synchronizerStarted, "consumer");
+
+        // Verify the synchronizer will be closed dynamically.
+        admin1.brokers().deleteDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC);
+        Awaitility.await().untilAsserted(() -> {
+            // The synchronizer that was started will be closed.
+            assertEquals(synchronizerStarted.getState(), PulsarMetadataEventSynchronizer.State.Closed);
+            assertTrue(synchronizerStarted.isClosingOrClosed());
+            assertFalse(producerStarted.isConnected());
+            assertFalse(consumerStarted.isConnected());
+            // The synchronizer in memory will be null.
+            assertNull(pulsar1.getConfig().getConfigurationMetadataSyncEventTopic());
+            PulsarMetadataEventSynchronizer synchronizer =
+                    WhiteboxImpl.getInternalState(pulsar1, "configMetadataSynchronizer");
+            assertNull(synchronizer);
+        });
+    }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
index 9a735e0..cababd0 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
@@ -49,5 +49,5 @@
     /**
      * close synchronizer resources.
      */
-    void close();
+    CompletableFuture<Void> closeAsync();
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index e565ba3..182c14e 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -84,6 +84,8 @@
         return Optional.empty();
     }
 
+    default void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) {}
+
     /**
      * Handles a metadata synchronizer event.
      *
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 7a495f7..3909a89 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -82,8 +82,7 @@
         String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
         // Local means a private data set
         // update synchronizer and register sync listener
-        synchronizer = metadataStoreConfig.getSynchronizer();
-        registerSyncListener(Optional.ofNullable(synchronizer));
+        updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer());
         if ("local".equals(name)) {
             map = new TreeMap<>();
             sequentialIdGenerator = new AtomicLong();
@@ -234,6 +233,12 @@
     }
 
     @Override
+    public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) {
+        this.synchronizer = synchronizer;
+        registerSyncListener(Optional.ofNullable(synchronizer));
+    }
+
+    @Override
     public void close() throws Exception {
         if (isClosed.compareAndSet(false, true)) {
             super.close();
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index be98512..39f7edd 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -112,8 +112,7 @@
         // Create a new store instance
         store = new RocksdbMetadataStore(metadataStoreUri, conf);
         // update synchronizer and register sync listener
-        store.synchronizer = conf.getSynchronizer();
-        store.registerSyncListener(Optional.ofNullable(store.synchronizer));
+        store.updateMetadataEventSynchronizer(conf.getSynchronizer());
         instancesCache.put(metadataStoreUri, store);
         return store;
     }
@@ -572,6 +571,12 @@
     public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
         return Optional.ofNullable(synchronizer);
     }
+
+    @Override
+    public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) {
+        this.synchronizer = synchronizer;
+        registerSyncListener(Optional.ofNullable(synchronizer));
+    }
 }
 
 class RocksdbMetadataStoreProvider implements MetadataStoreProvider {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index a164e4c..5b45530 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -52,7 +52,7 @@
     private final int maxDelayMillis;
     private final int maxOperations;
     private final int maxSize;
-    private final MetadataEventSynchronizer synchronizer;
+    private MetadataEventSynchronizer synchronizer;
     private final BatchMetadataStoreStats batchMetadataStoreStats;
 
     protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
@@ -75,8 +75,7 @@
         }
 
         // update synchronizer and register sync listener
-        synchronizer = conf.getSynchronizer();
-        registerSyncListener(Optional.ofNullable(synchronizer));
+        updateMetadataEventSynchronizer(conf.getSynchronizer());
         this.batchMetadataStoreStats =
                 new BatchMetadataStoreStats(metadataStoreName, executor);
     }
@@ -161,6 +160,12 @@
         return Optional.ofNullable(synchronizer);
     }
 
+    @Override
+    public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) {
+        this.synchronizer = synchronizer;
+        registerSyncListener(Optional.ofNullable(synchronizer));
+    }
+
     private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) {
         if (enabled) {
             if (!queue.offer(op)) {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
index 3fabe964..caca16f 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
@@ -206,8 +206,8 @@
         }
 
         @Override
-        public void close() {
-            // No-op
+        public CompletableFuture<Void> closeAsync() {
+            return CompletableFuture.completedFuture(null);
         }
 
     }