[RIP-81] Master-slave metadata incremental synchronization (#9617)

* [ISSUE #9616] Master-slave metadata incremental synchronization

* commit

* Fix test, adapt topic and subscriptionConfig's rocksdb mode, optimize code

* Fix test

* Fix test

* Fix test

* Fix test

* 1. Simplified log printing
2. Increased snapshot storage interval to reduce the increased garbage collection frequency caused by deep copies
3. Added DataVersion transmission
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7b1701c..32be940 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,8 +16,10 @@
  */
 package org.apache.rocketmq.broker;
 
+import com.alibaba.fastjson.JSON;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory;
 import org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager;
 import org.apache.rocketmq.auth.authorization.factory.AuthorizationFactory;
@@ -84,6 +86,11 @@
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.sync.MetadataChangeObserver;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
+import org.apache.rocketmq.broker.sync.SyncMessageProducer;
+import org.apache.rocketmq.broker.sync.SyncMetadataChangeObserver;
 import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
@@ -110,6 +117,7 @@
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.stats.MomentStatsItem;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.ServiceProvider;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -156,11 +164,19 @@
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.apache.rocketmq.store.timer.TimerMetrics;
 
+import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -238,6 +254,7 @@
     protected final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
     protected final BlockingQueue<Runnable> adminBrokerThreadPoolQueue;
     protected final BlockingQueue<Runnable> loadBalanceThreadPoolQueue;
+    protected final BlockingQueue<Runnable> metadataIncrementalSyncThreadPoolQueue;
     protected BrokerStatsManager brokerStatsManager;
     protected final List<SendMessageHook> sendMessageHookList = new ArrayList<>();
     protected final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
@@ -266,6 +283,7 @@
     protected ExecutorService consumerManageExecutor;
     protected ExecutorService loadBalanceExecutor;
     protected ExecutorService endTransactionExecutor;
+    protected ExecutorService metadataIncrementalSyncExecutor;
     protected boolean updateMasterHAServerAddrPeriodically = false;
     private BrokerStats brokerStats;
     private InetSocketAddress storeHost;
@@ -300,6 +318,7 @@
     private TransactionMetricsFlushService transactionMetricsFlushService;
     private AuthenticationMetadataManager authenticationMetadataManager;
     private AuthorizationMetadataManager authorizationMetadataManager;
+    private MetadataChangeObserver metadataChangeObserver;
 
     private ConfigContext configContext;
 
@@ -421,6 +440,14 @@
         this.loadBalanceThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getLoadBalanceThreadPoolQueueCapacity());
 
         this.brokerFastFailure = new BrokerFastFailure(this);
+        if (this.brokerConfig.isAllowMetadataIncrementalSync() && this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
+            SyncMessageProducer syncMessageProducer = new SyncMessageProducer(this);
+            this.metadataIncrementalSyncThreadPoolQueue = new LinkedBlockingQueue<>(10);
+            metadataChangeObserver = new SyncMetadataChangeObserver(syncMessageProducer);
+        } else {
+            metadataChangeObserver = new NoopMetadataChangeObserver();
+            this.metadataIncrementalSyncThreadPoolQueue = null;
+        }
 
         String brokerConfigPath;
         if (brokerConfig.getBrokerConfigPath() != null && !brokerConfig.getBrokerConfigPath().isEmpty()) {
@@ -637,6 +664,16 @@
             this.loadBalanceThreadPoolQueue,
             new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));
 
+        if (this.brokerConfig.isAllowMetadataIncrementalSync() && this.brokerConfig.getBrokerId() == 0L) {
+            this.metadataIncrementalSyncExecutor = ThreadUtils.newThreadPoolExecutor(
+                    this.brokerConfig.getMetadataIncrementalSyncThreadPoolNums(),
+                    this.brokerConfig.getMetadataIncrementalSyncThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.metadataIncrementalSyncThreadPoolQueue,
+                    new ThreadFactoryImpl("MetadataIncrementalSyncThread_", getBrokerIdentity()));
+        }
+
         this.syncBrokerMemberGroupExecutorService = ThreadUtils.newScheduledThreadPool(1,
             new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
         this.brokerHeartbeatExecutorService = ThreadUtils.newScheduledThreadPool(1,
@@ -747,7 +784,11 @@
                     public void run() {
                         try {
                             if (System.currentTimeMillis() - lastSyncTimeMs > 60 * 1000) {
-                                BrokerController.this.getSlaveSynchronize().syncAll();
+                                if (!getBrokerConfig().isAllowMetadataIncrementalSync()) {
+                                    BrokerController.this.getSlaveSynchronize().syncAll();
+                                } else {
+                                    BrokerController.this.getSlaveSynchronize().start();
+                                }
                                 lastSyncTimeMs = System.currentTimeMillis();
                             }
 
@@ -775,12 +816,164 @@
                 }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
             }
         }
+        if (this.brokerConfig.isAllowMetadataIncrementalSync() && this.brokerConfig.getBrokerId() == 0L) {
+
+            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                try {
+                    this.consumerOffsetManager.checkAndSync();
+                } catch (Throwable e) {
+                    LOG.error("Failed to execute scheduled sync task", e);
+                }
+            }, 1000, 1000, TimeUnit.MILLISECONDS);
+
+            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                try {
+                    deleteOutdatedSnapshot();
+                    persistBrokerConfigSnapshotsSeparately();
+                } catch (Throwable e) {
+                    LOG.error("Failed to execute scheduled sync task", e);
+                }
+            }, 10, this.brokerConfig.getSnapshotIntervalSeconds(), TimeUnit.SECONDS);
+        }
 
         if (this.brokerConfig.isEnableControllerMode()) {
             this.updateMasterHAServerAddrPeriodically = true;
         }
     }
 
+
+    private void persistBrokerConfigSnapshotsSeparately() {
+        LOG.info("Starting to persist broker config snapshots separately...");
+
+        String snapshotDir = messageStoreConfig.getStorePathRootDir() + File.separator + "snapshot";
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+        String timestamp = dateFormat.format(new Date());
+
+        try {
+            Path dirPath = Paths.get(snapshotDir);
+            if (Files.notExists(dirPath)) {
+                Files.createDirectories(dirPath);
+            }
+
+            submitSnapshotTask(() -> {
+                try {
+                    long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC, 0);
+                    Object snapshotData = Triple.of(topicConfigManager.deepCopyTopicConfigTable(), topicConfigManager.getDataVersion(), maxOffset);
+                    writeSnapshotToFile(MixAll.SNAPSHOT_NAME_TOPIC_CONFIG, snapshotData, snapshotDir, timestamp);
+                } catch (Throwable e) {
+                    LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_TOPIC_CONFIG, e);
+                }
+            });
+
+            submitSnapshotTask(() -> {
+                try {
+                    long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, 0);
+                    Object snapshotData = Triple.of(subscriptionGroupManager.deepCopySubscriptionGroupTable(), subscriptionGroupManager.getDataVersion(), maxOffset);
+                    writeSnapshotToFile(MixAll.SNAPSHOT_NAME_SUBSCRIPTION_GROUP, snapshotData, snapshotDir, timestamp);
+                } catch (Throwable e) {
+                    LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_SUBSCRIPTION_GROUP, e);
+                }
+            });
+
+            submitSnapshotTask(() -> {
+                try {
+                    long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC, 0);
+                    Object snapshotData = Triple.of(consumerOffsetManager.deepCopyOffsetTableSnapshot(), consumerOffsetManager.getDataVersion(), maxOffset);
+                    writeSnapshotToFile(MixAll.SNAPSHOT_NAME_CONSUMER_OFFSET, snapshotData, snapshotDir, timestamp);
+                } catch (Throwable e) {
+                    LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_CONSUMER_OFFSET, e);
+                }
+            });
+
+            submitSnapshotTask(() -> {
+                try {
+                    long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, 0);
+                    Object snapshotData = Triple.of(this.getScheduleMessageService().deepCopyDelayOffsetTable(), this.getScheduleMessageService().getDataVersion(), maxOffset);
+                    writeSnapshotToFile(MixAll.SNAPSHOT_NAME_DELAY_OFFSET, snapshotData, snapshotDir, timestamp);
+                } catch (Throwable e) {
+                    LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_DELAY_OFFSET, e);
+                }
+            });
+
+            submitSnapshotTask(() -> {
+                try {
+                    long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, 0);
+                    Object snapshotData = Pair.of(this.queryAssignmentProcessor.getMessageRequestModeManager().deepCopyMessageRequestModeMapSnapshot(), maxOffset);
+                    writeSnapshotToFile(MixAll.SNAPSHOT_NAME_MESSAGE_MODE, snapshotData, snapshotDir, timestamp);
+                } catch (Throwable e) {
+                    LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_MESSAGE_MODE, e);
+                }
+            });
+
+            submitSnapshotTask(() -> {
+                try {
+                    long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, 0);
+                    Object snapshotData = Triple.of(this.messageStore.getTimerMessageStore().getTimerMetrics().deepCopyTimingCountSnapshot(),this.messageStore.getTimerMessageStore().getTimerMetrics().getDataVersion(), maxOffset);
+                    writeSnapshotToFile(MixAll.SNAPSHOT_NAME_TIMER_METRICS, snapshotData, snapshotDir, timestamp);
+                } catch (Throwable e) {
+                    LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_TIMER_METRICS, e);
+                }
+            });
+
+        } catch (Throwable e) {
+            LOG.error("Failed to create snapshot directory", e);
+        }
+    }
+
+    private void writeSnapshotToFile(String snapshotName, Object snapshotData, String snapshotDir, String timestamp) throws Exception {
+        String jsonSnapshot = JSON.toJSONString(snapshotData);
+        String filename = String.format("%s_snapshot_%s.json", snapshotName, timestamp);
+        Path filePath = Paths.get(snapshotDir, filename);
+        Files.write(filePath, jsonSnapshot.getBytes(StandardCharsets.UTF_8));
+    }
+
+    private void submitSnapshotTask(Runnable task) {
+        metadataIncrementalSyncExecutor.submit(task);
+    }
+
+
+    private void deleteOutdatedSnapshot() {
+        try {
+            String snapshotDir = messageStoreConfig.getStorePathRootDir() + File.separator + "snapshot";
+            Path dirPath = Paths.get(snapshotDir);
+
+            if (Files.notExists(dirPath)) {
+                return;
+            }
+
+            MixAll.SNAPSHOT_NAMES.forEach(name -> {
+                List<Path> snapshotFiles = null;
+                try {
+                    snapshotFiles = Files.list(dirPath)
+                            .filter(path -> path.getFileName().toString().startsWith(name))
+                            .sorted((p1, p2) -> {
+                                try {
+                                    return Files.getLastModifiedTime(p2).compareTo(Files.getLastModifiedTime(p1));
+                                } catch (IOException e) {
+                                    return 0;
+                                }
+                            })
+                            .collect(Collectors.toList());
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                if (snapshotFiles.size() > 1) {
+                    for (int i = 1; i < snapshotFiles.size(); i++) {
+                        try {
+                            Files.delete(snapshotFiles.get(i));
+                        } catch (IOException e) {
+                            LOG.warn("Failed to delete {}", snapshotFiles.get(i), e);
+                        }
+                    }
+                }
+            });
+
+        } catch (Throwable e) {
+            LOG.error("Error in snapshot cleanup", e);
+        }
+    }
+
+
     protected void initializeScheduledTasks() {
 
         initializeBrokerScheduledTasks();
@@ -862,7 +1055,7 @@
             this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
             if (messageStoreConfig.isTimerWheelEnable()) {
                 this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
-                TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
+                TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()), metadataChangeObserver);
                 this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
                 this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
                 this.messageStore.setTimerMessageStore(this.timerMessageStore);
@@ -1381,6 +1574,14 @@
         this.subscriptionGroupManager = subscriptionGroupManager;
     }
 
+    public MetadataChangeObserver getMetadataChangeObserver() {
+        return metadataChangeObserver;
+    }
+
+    public void setMetadataChangeObserver(MetadataChangeObserver metadataChangeObserver) {
+        this.metadataChangeObserver = metadataChangeObserver;
+    }
+
     public SubscriptionGroupManager getSubscriptionGroupManager() {
         return subscriptionGroupManager;
     }
@@ -1567,7 +1768,7 @@
         if (this.transactionalMessageCheckService != null) {
             this.transactionalMessageCheckService.shutdown(false);
         }
-        
+
         if (this.loadBalanceExecutor != null) {
             this.loadBalanceExecutor.shutdown();
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index f7e0de9..e414263 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -128,8 +128,7 @@
     @Override
     public SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
         String groupName = subscriptionGroupConfig.getGroupName();
-        SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig);
-
+        SubscriptionGroupConfig oldConfig = super.putSubscriptionGroupConfig(subscriptionGroupConfig);
         try {
             byte[] keyBytes = groupName.getBytes(DataConverter.CHARSET_UTF8);
             byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible);
@@ -143,7 +142,7 @@
     @Override
     protected SubscriptionGroupConfig putSubscriptionGroupConfigIfAbsent(SubscriptionGroupConfig subscriptionGroupConfig) {
         String groupName = subscriptionGroupConfig.getGroupName();
-        SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.putIfAbsent(groupName, subscriptionGroupConfig);
+        SubscriptionGroupConfig oldConfig = super.putSubscriptionGroupConfigIfAbsent(subscriptionGroupConfig);
         if (oldConfig == null) {
             try {
                 byte[] keyBytes = groupName.getBytes(DataConverter.CHARSET_UTF8);
@@ -158,7 +157,7 @@
 
     @Override
     protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) {
-        SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.remove(groupName);
+        SubscriptionGroupConfig subscriptionGroupConfig = super.removeSubscriptionGroupConfig(groupName);
         try {
             this.rocksDBConfigManager.delete(groupName.getBytes(DataConverter.CHARSET_UTF8));
         } catch (Exception e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index d64f808..591cac1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -110,7 +110,8 @@
     @Override
     public TopicConfig putTopicConfig(TopicConfig topicConfig) {
         String topicName = topicConfig.getTopicName();
-        TopicConfig oldTopicConfig = this.topicConfigTable.put(topicName, topicConfig);
+        TopicConfig oldTopicConfig = super.putTopicConfig(topicConfig);
+
         try {
             byte[] keyBytes = topicName.getBytes(DataConverter.CHARSET_UTF8);
             byte[] valueBytes = JSON.toJSONBytes(topicConfig, SerializerFeature.BrowserCompatible);
@@ -123,7 +124,7 @@
 
     @Override
     protected TopicConfig removeTopicConfig(String topicName) {
-        TopicConfig topicConfig = this.topicConfigTable.remove(topicName);
+        TopicConfig topicConfig = super.removeTopicConfig(topicName);
         try {
             this.rocksDBConfigManager.delete(topicName.getBytes(DataConverter.CHARSET_UTF8));
         } catch (Exception e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index f22f22a..7dc526e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -354,7 +354,11 @@
             slaveSyncFuture = this.brokerController.getScheduledExecutorService().scheduleAtFixedRate(() -> {
                 try {
                     if (System.currentTimeMillis() - lastSyncTimeMs > 10 * 1000) {
-                        brokerController.getSlaveSynchronize().syncAll();
+                        if (!this.brokerController.getBrokerConfig().isAllowMetadataIncrementalSync()) {
+                            this.brokerController.getSlaveSynchronize().syncAll();
+                        } else {
+                            this.brokerController.getSlaveSynchronize().start();
+                        }
                         lastSyncTimeMs = System.currentTimeMillis();
                     }
                     //timer checkpoint, latency-sensitive, so sync it more frequently
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
index e6cb976..7ed3b97 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
@@ -114,7 +114,11 @@
                 public void run() {
                     try {
                         if (System.currentTimeMillis() - lastSyncTimeMs > 10 * 1000) {
-                            brokerController.getSlaveSynchronize().syncAll();
+                            if (!brokerController.getBrokerConfig().isAllowMetadataIncrementalSync()) {
+                                brokerController.getSlaveSynchronize().syncAll();
+                            } else {
+                                brokerController.getSlaveSynchronize().start();
+                            }
                             lastSyncTimeMs = System.currentTimeMillis();
                         }
                         //timer checkpoint, latency-sensitive, so sync it more frequently
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java b/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
index 0c69e2d..d86d226 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
@@ -16,10 +16,12 @@
  */
 package org.apache.rocketmq.broker.loadbalance;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
 
@@ -42,15 +44,34 @@
         ConcurrentHashMap<String, SetMessageRequestModeRequestBody> consumerGroup2ModeMap = messageRequestModeMap.get(topic);
         if (consumerGroup2ModeMap == null) {
             consumerGroup2ModeMap = new ConcurrentHashMap<>();
-            ConcurrentHashMap<String, SetMessageRequestModeRequestBody> pre =
-                messageRequestModeMap.putIfAbsent(topic, consumerGroup2ModeMap);
-            if (pre != null) {
-                consumerGroup2ModeMap = pre;
+            ConcurrentHashMap<String, SetMessageRequestModeRequestBody>[] pre = new ConcurrentHashMap[1];
+            ConcurrentHashMap<String, SetMessageRequestModeRequestBody> finalConsumerGroup2ModeMap = consumerGroup2ModeMap;
+            messageRequestModeMap.compute(topic, (key, existingValue) -> {
+                if (existingValue == null) {
+                    notifyMessageRequestModeCreated(requestBody);
+                    pre[0] = null;
+                    return finalConsumerGroup2ModeMap;
+                } else {
+                    notifyMessageRequestModeUpdated(requestBody);
+                    pre[0] = existingValue;
+                    return existingValue;
+                }
+            });
+            if (pre[0] != null) {
+                consumerGroup2ModeMap = pre[0];
             }
         }
         consumerGroup2ModeMap.put(consumerGroup, requestBody);
     }
 
+    private void notifyMessageRequestModeCreated(SetMessageRequestModeRequestBody requestBody) {
+        brokerController.getMetadataChangeObserver().onCreated(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, requestBody.getTopic(), requestBody);
+    }
+
+    private void notifyMessageRequestModeUpdated(SetMessageRequestModeRequestBody requestBody) {
+        brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, requestBody.getTopic(), requestBody);
+    }
+
     public SetMessageRequestModeRequestBody getMessageRequestMode(String topic, String consumerGroup) {
         ConcurrentHashMap<String, SetMessageRequestModeRequestBody> consumerGroup2ModeMap = messageRequestModeMap.get(topic);
         if (consumerGroup2ModeMap != null) {
@@ -92,4 +113,31 @@
     public String encode(boolean prettyFormat) {
         return RemotingSerializable.toJson(this, prettyFormat);
     }
+
+    public ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> deepCopyMessageRequestModeMapSnapshot() {
+        ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> newOuterMap = new ConcurrentHashMap<>(this.messageRequestModeMap.size());
+        for (Map.Entry<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> topicEntry : this.messageRequestModeMap.entrySet()) {
+            String topic = topicEntry.getKey();
+            ConcurrentHashMap<String, SetMessageRequestModeRequestBody> originalInnerMap = topicEntry.getValue();
+
+            if (originalInnerMap != null) {
+                ConcurrentHashMap<String, SetMessageRequestModeRequestBody> newInnerMap = new ConcurrentHashMap<>(originalInnerMap.size());
+                for (Map.Entry<String, SetMessageRequestModeRequestBody> groupEntry : originalInnerMap.entrySet()) {
+                    String consumerGroup = groupEntry.getKey();
+                    SetMessageRequestModeRequestBody originalBody = groupEntry.getValue();
+
+                    if (originalBody != null) {
+                        try {
+                            SetMessageRequestModeRequestBody clonedBody = originalBody.clone();
+                            newInnerMap.put(consumerGroup, clonedBody);
+                        } catch (CloneNotSupportedException e) {
+                            newInnerMap.put(consumerGroup, originalBody);
+                        }
+                    }
+                }
+                newOuterMap.put(topic, newInnerMap);
+            }
+        }
+        return newOuterMap;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index e1e1cb4..e74428c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.broker.offset;
 
 import com.google.common.collect.Maps;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -25,6 +26,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Strings;
@@ -36,6 +38,7 @@
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
@@ -54,7 +57,19 @@
         new ConcurrentHashMap<>(512);
 
     private final ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> pullOffsetTable =
-        new ConcurrentHashMap<>(512);
+            new ConcurrentHashMap<>(512);
+
+    private volatile ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> workingBuffer =
+            new ConcurrentHashMap<>(512);
+
+    private volatile ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> syncBuffer =
+            new ConcurrentHashMap<>(512);
+
+    private final Object bufferSwapLock = new Object();
+
+    private transient ScheduledExecutorService scheduledExecutorService;
+
+    private final transient AtomicLong lastSyncTimestamp = new AtomicLong(System.currentTimeMillis());
 
     protected transient BrokerController brokerController;
 
@@ -70,6 +85,36 @@
     protected void removeConsumerOffset(String topicAtGroup) {
 
     }
+    private void triggerBufferSwapAndSync() {
+        synchronized (bufferSwapLock) {
+            ConcurrentMap<String, ConcurrentMap<Integer, Long>> oldSyncBuffer = this.syncBuffer;
+            this.syncBuffer = this.workingBuffer;
+            this.workingBuffer = new ConcurrentHashMap<>(512);
+            if (oldSyncBuffer != null && !oldSyncBuffer.isEmpty()) {
+                oldSyncBuffer.forEach((key, map) ->
+                        this.syncBuffer.computeIfAbsent(key, k -> new ConcurrentHashMap<>()).putAll(map));
+            }
+        }
+        if (!this.syncBuffer.isEmpty()) {
+            try {
+                this.brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC, String.valueOf(dataVersion), this.syncBuffer);
+            } catch (Exception e) {
+                LOG.error("Failed to sync consumer offsets.", e);
+            } finally {
+                this.lastSyncTimestamp.set(System.currentTimeMillis());
+                this.syncBuffer.clear();
+            }
+        }
+    }
+
+    public void checkAndSync() {
+        boolean timeCondition = (System.currentTimeMillis() - lastSyncTimestamp.get()) > 1000;
+        boolean bufferHasData = !workingBuffer.isEmpty();
+        if (timeCondition && bufferHasData) {
+            LOG.debug("Consumer offset sync triggered by time interval.");
+            this.triggerBufferSwapAndSync();
+        }
+    }
 
     public void cleanOffset(String group) {
         Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
@@ -216,9 +261,12 @@
                 LOG.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
             }
         }
+        ConcurrentMap<Integer, Long> workingMap = this.workingBuffer.computeIfAbsent(key, k -> new ConcurrentHashMap<>(32));
+        workingMap.put(queueId, offset);
         if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep() == 0) {
             long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
             dataVersion.nextVersion(stateMachineVersion);
+            this.triggerBufferSwapAndSync();
         }
     }
 
@@ -370,7 +418,10 @@
     public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
         ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
         if (offsets != null) {
-            this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<>(offsets));
+            ConcurrentMap<Integer, Long> newOffsets = new ConcurrentHashMap<>(offsets);
+            this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, newOffsets);
+            this.workingBuffer.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<>(newOffsets));
+            this.triggerBufferSwapAndSync();
         }
     }
 
@@ -418,13 +469,15 @@
             }
             return removed;
         };
-
         boolean clearOffset = deleteFunction.apply(this.offsetTable.entrySet().iterator());
+        boolean clearWorking = deleteFunction.apply(this.workingBuffer.entrySet().iterator());
+        boolean clearSync = deleteFunction.apply(this.syncBuffer.entrySet().iterator());
         boolean clearReset = deleteFunction.apply(this.resetOffsetTable.entrySet().iterator());
         boolean clearPull = deleteFunction.apply(this.pullOffsetTable.entrySet().iterator());
 
         LOG.info("Consumer offset manager clean group offset, groupName={}, " +
-            "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull);
+                        "offsetTable={}, workingBuffer={}, syncBuffer={}, resetOffsetTable={}, pullOffsetTable={}",
+                group, clearOffset, clearWorking, clearSync, clearReset, clearPull);
     }
 
     public void assignResetOffset(String topic, String group, int queueId, long offset) {
@@ -443,6 +496,8 @@
         // 2, Our overriding here may get overridden by the client instantly in concurrent cases; But it still makes
         // sense in cases like clients are offline.
         offsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset);
+        workingBuffer.computeIfAbsent(key, k -> new ConcurrentHashMap<>()).put(queueId, offset);
+        this.triggerBufferSwapAndSync();
     }
 
     public boolean hasOffsetReset(String topic, String group, int queueId) {
@@ -463,4 +518,19 @@
             return map.remove(queueId);
         }
     }
+
+    public ConcurrentMap<String, ConcurrentMap<Integer, Long>> deepCopyOffsetTableSnapshot() {
+        ConcurrentMap<String, ConcurrentMap<Integer, Long>> newTable = new ConcurrentHashMap<>(this.offsetTable.size());
+        for (Map.Entry<String, ConcurrentMap<Integer, Long>> entry : this.offsetTable.entrySet()) {
+            String topicAtGroup = entry.getKey();
+            ConcurrentMap<Integer, Long> originalInnerMap = entry.getValue();
+
+            if (originalInnerMap != null) {
+                ConcurrentMap<Integer, Long> newInnerMap = new ConcurrentHashMap<>(originalInnerMap.size());
+                newInnerMap.putAll(originalInnerMap);
+                newTable.put(topicAtGroup, newInnerMap);
+            }
+        }
+        return newTable;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 21ba349..329245f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -17,11 +17,14 @@
 package org.apache.rocketmq.broker.out;
 
 import com.alibaba.fastjson2.JSON;
+
 import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -35,6 +38,8 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import com.alibaba.fastjson2.TypeReference;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
@@ -100,6 +105,7 @@
 import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.RegisterBrokerBody;
+import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
@@ -1627,6 +1633,271 @@
         return pullResult;
     }
 
+    public Triple<ConcurrentMap<String, TopicConfig>, DataVersion, Long> getTopicConfigSnapShot(String brokerAddr)
+            throws Exception {
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG_SNAPSHOT, null);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+
+        byte[] body = response.getBody();
+        if (body == null || body.length == 0) {
+            return null;
+        }
+
+        String jsonString = new String(body, StandardCharsets.UTF_8);
+
+        Triple<LinkedHashMap<String, Object>, DataVersion, Long> rawSnapshotData =
+                JSON.parseObject(
+                        jsonString,
+                        new TypeReference<Triple<LinkedHashMap<String, Object>, DataVersion, Long>>() {
+                        }
+                );
+
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
+        LinkedHashMap<String, Object> linkedMap = rawSnapshotData.getLeft();
+
+        if (linkedMap != null) {
+            for (Map.Entry<String, Object> entry : linkedMap.entrySet()) {
+                String topicName = entry.getKey();
+                Object value = entry.getValue();
+
+                if (value != null) {
+                    try {
+                        String configJson = JSON.toJSONString(value);
+                        TopicConfig topicConfig = JSON.parseObject(configJson, TopicConfig.class);
+
+                        if (topicConfig != null) {
+                            try {
+                                TopicConfig clonedConfig = topicConfig.clone();
+                                topicConfigTable.put(topicName, clonedConfig);
+                            } catch (CloneNotSupportedException e) {
+                                topicConfigTable.put(topicName, topicConfig);
+                            }
+                        }
+                    } catch (Exception e) {
+                        LOGGER.warn("Failed to parse TopicConfig for topic: {}, error: {}", topicName, e.getMessage());
+                    }
+                }
+            }
+        }
+
+        DataVersion dataVersion =
+                JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+        Long maxOffset =
+                JSON.to(Long.class, rawSnapshotData.getRight());
+
+        return Triple.of(topicConfigTable, dataVersion, maxOffset);
+    }
+
+
+    public Triple<ConcurrentMap<String, ConcurrentMap<Integer, Long>>, DataVersion, Long> getConsumerOffsetSnapShot(String brokerAddr)
+            throws Exception {
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_OFFSET_SNAPSHOT, null);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+
+        byte[] body = response.getBody();
+        if (body == null || body.length == 0) {
+            return null;
+        }
+        String jsonString = new String(body, StandardCharsets.UTF_8);
+        Triple<LinkedHashMap<String, LinkedHashMap<Integer, Long>>, DataVersion, Long> rawSnapshotData =
+                JSON.parseObject(
+                        jsonString,
+                        new TypeReference<Triple<LinkedHashMap<String, LinkedHashMap<Integer, Long>>, DataVersion, Long>>() {
+                        }
+                );
+
+        ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>();
+        LinkedHashMap<String, LinkedHashMap<Integer, Long>> linkedMap = rawSnapshotData.getLeft();
+
+        if (linkedMap != null) {
+            for (Map.Entry<String, LinkedHashMap<Integer, Long>> outerEntry : linkedMap.entrySet()) {
+                ConcurrentMap<Integer, Long> innerConcurrentMap = new ConcurrentHashMap<>(outerEntry.getValue());
+                offsetTable.put(outerEntry.getKey(), innerConcurrentMap);
+            }
+        }
+        DataVersion dataVersion =
+                JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+        Long maxOffset =
+                JSON.to(Long.class, rawSnapshotData.getRight());
+
+        return Triple.of(offsetTable, dataVersion, maxOffset);
+    }
+
+    public Triple<ConcurrentMap<String, SubscriptionGroupConfig>, DataVersion, Long> getSubscriptionGroupSnapShot(String brokerAddr)
+            throws Exception {
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SUBSCRIPTION_GROUP_SNAPSHOT, null);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+
+        byte[] body = response.getBody();
+        if (body == null || body.length == 0) {
+            return null;
+        }
+
+        String jsonString = new String(body, StandardCharsets.UTF_8);
+        Triple<LinkedHashMap<String, Object>, DataVersion, Long> rawSnapshotData =
+                JSON.parseObject(
+                        jsonString,
+                        new TypeReference<Triple<LinkedHashMap<String, Object>, DataVersion, Long>>() {
+                        }
+                );
+
+        ConcurrentMap<String, SubscriptionGroupConfig> subGroupTable = new ConcurrentHashMap<>();
+        LinkedHashMap<String, Object> linkedMap = rawSnapshotData.getLeft();
+        if (linkedMap != null) {
+            for (Map.Entry<String, Object> entry : linkedMap.entrySet()) {
+                String groupName = entry.getKey();
+                String configJson = JSON.toJSONString(entry.getValue());
+                SubscriptionGroupConfig newConfig = JSON.parseObject(configJson, SubscriptionGroupConfig.class);
+
+                subGroupTable.put(groupName, newConfig);
+            }
+        }
+
+        DataVersion dataVersion =
+                JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+        Long maxOffset =
+                JSON.to(Long.class, rawSnapshotData.getRight());
+
+        return Triple.of(subGroupTable, dataVersion, maxOffset);
+    }
+
+    public Triple<ConcurrentMap<String, Long>, DataVersion, Long> getDelayOffsetSnapShot(String brokerAddr)
+            throws Exception {
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_DELAY_OFFSET_SNAPSHOT, null);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+
+        byte[] body = response.getBody();
+        if (body == null || body.length == 0) {
+            return null;
+        }
+
+        String jsonString = new String(body, StandardCharsets.UTF_8);
+        Triple<LinkedHashMap<String, Object>, DataVersion, Long> rawSnapshotData =
+                JSON.parseObject(
+                        jsonString,
+                        new TypeReference<Triple<LinkedHashMap<String, Object>, DataVersion, Long>>() {
+                        }
+                );
+
+        ConcurrentMap<String, Long> delayOffsetTable = new ConcurrentHashMap<>();
+        LinkedHashMap<String, Object> linkedMap = rawSnapshotData.getLeft();
+        if (linkedMap != null) {
+            for (Map.Entry<String, Object> entry : linkedMap.entrySet()) {
+                String key = entry.getKey();
+                Long offsetValue = JSON.to(Long.class, entry.getValue());
+                delayOffsetTable.put(key, offsetValue);
+            }
+        }
+
+        DataVersion dataVersion =
+                JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+        Long maxOffset =
+                JSON.to(Long.class, rawSnapshotData.getRight());
+
+        return Triple.of(delayOffsetTable, dataVersion, maxOffset);
+    }
+
+
+    public Triple<ConcurrentMap<String, TimerMetrics.Metric>, DataVersion, Long> getTopicMetricsSnapShot(String brokerAddr)
+            throws Exception {
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_METRICS_SNAPSHOT, null);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+
+        byte[] body = response.getBody();
+        if (body == null || body.length == 0) {
+            return null;
+        }
+
+        String jsonString = new String(body, StandardCharsets.UTF_8);
+        Triple<LinkedHashMap<String, TimerMetrics.Metric>, DataVersion, Long> rawSnapshotData =
+                JSON.parseObject(
+                        jsonString,
+                        new TypeReference<Triple<LinkedHashMap<String, TimerMetrics.Metric>, DataVersion, Long>>() {
+                        }
+                );
+
+        ConcurrentMap<String, TimerMetrics.Metric> topicMetricsTable = new ConcurrentHashMap<>();
+        LinkedHashMap<String, TimerMetrics.Metric> linkedMap = rawSnapshotData.getLeft();
+        if (linkedMap != null) {
+            topicMetricsTable.putAll(linkedMap);
+        }
+
+        DataVersion dataVersion =
+                JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+        Long maxOffset =
+                JSON.to(Long.class, rawSnapshotData.getRight());
+
+        return Triple.of(topicMetricsTable, dataVersion, maxOffset);
+    }
+
+
+    public Pair<ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>>, Long> getSetMessageRequestModeSnapShot(String brokerAddr)
+            throws Exception {
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MESSAGE_REQUEST_MODE_SNAPSHOT, null);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+
+        byte[] body = response.getBody();
+        if (body == null || body.length == 0) {
+            return null;
+        }
+
+        String jsonString = new String(body, StandardCharsets.UTF_8);
+        Pair<LinkedHashMap<String, LinkedHashMap<String, SetMessageRequestModeRequestBody>>, Long> rawSnapshotData =
+                JSON.parseObject(
+                        jsonString,
+                        new TypeReference<Pair<LinkedHashMap<String, LinkedHashMap<String, SetMessageRequestModeRequestBody>>, Long>>() {
+                        }
+                );
+
+        ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> requestModeMap = new ConcurrentHashMap<>();
+        LinkedHashMap<String, LinkedHashMap<String, SetMessageRequestModeRequestBody>> linkedMap = rawSnapshotData.getObject1();
+        if (linkedMap != null) {
+            for (Map.Entry<String, LinkedHashMap<String, SetMessageRequestModeRequestBody>> outerEntry : linkedMap.entrySet()) {
+                ConcurrentHashMap<String, SetMessageRequestModeRequestBody> innerConcurrentMap = new ConcurrentHashMap<>(outerEntry.getValue());
+                requestModeMap.put(outerEntry.getKey(), innerConcurrentMap);
+            }
+        }
+
+        Long maxOffset =
+                JSON.to(Long.class, rawSnapshotData.getObject2());
+
+        return Pair.of(requestModeMap, maxOffset);
+    }
+
     private int getMaxPageSize() {
         return 2000;
     }
@@ -1634,5 +1905,4 @@
     private long getTimeoutMillis() {
         return TimeUnit.SECONDS.toMillis(60);
     }
-
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 298e239..511765d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -22,11 +22,18 @@
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.opentelemetry.api.common.Attributes;
+
+import java.io.File;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -405,6 +412,18 @@
                 return this.listAcl(ctx, request);
             case RequestCode.POP_ROLLBACK:
                 return this.transferPopToFsStore(ctx, request);
+            case RequestCode.GET_CONSUMER_OFFSET_SNAPSHOT:
+                return this.getConsumerOffsetSnapshot(ctx, request);
+            case RequestCode.GET_SUBSCRIPTION_GROUP_SNAPSHOT:
+                return this.getSubscriptionGroupSnapshot(ctx, request);
+            case RequestCode.GET_TOPIC_CONFIG_SNAPSHOT:
+                return this.getTopicConfigSnapshot(ctx, request);
+            case RequestCode.GET_DELAY_OFFSET_SNAPSHOT:
+                return this.getDelayOffsetSnapshot(ctx, request);
+            case RequestCode.GET_TOPIC_METRICS_SNAPSHOT:
+                return this.getTopicMetricsSnapshot(ctx, request);
+            case RequestCode.GET_MESSAGE_REQUEST_MODE_SNAPSHOT:
+                return this.getMessageRequestModeSnapshot(ctx, request);
             default:
                 return getUnknownCmdResponse(ctx, request);
         }
@@ -3408,4 +3427,141 @@
         }
         return response;
     }
+
+
+    private RemotingCommand getMessageRequestModeSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        try {
+            byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_MESSAGE_MODE);
+            if (body != null) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setBody(body);
+            } else {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("No latest snapshot file found for message_mode.");
+            }
+        } catch (Exception e) {
+            LOGGER.error("getMessageRequestModeSnapshot error", e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+        }
+        return response;
+    }
+
+    private RemotingCommand getTopicMetricsSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        try {
+            byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_TIMER_METRICS);
+            if (body != null) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setBody(body);
+            } else {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("No latest snapshot file found for timer_metrics.");
+            }
+        } catch (Exception e) {
+            LOGGER.error("getTopicMetricsSnapshot error", e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+        }
+        return response;
+    }
+
+    private RemotingCommand getDelayOffsetSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        try {
+            byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_DELAY_OFFSET);
+            if (body != null) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setBody(body);
+            } else {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("No latest snapshot file found for delay_offset.");
+            }
+        } catch (Exception e) {
+            LOGGER.error("getDelayOffsetSnapshot error", e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+        }
+        return response;
+    }
+
+    private RemotingCommand getTopicConfigSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        try {
+            byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_TOPIC_CONFIG);
+            if (body != null) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setBody(body);
+            } else {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("No latest snapshot file found for topic_config.");
+            }
+        } catch (Exception e) {
+            LOGGER.error("getTopicConfigSnapshot error", e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+        }
+        return response;
+    }
+
+    private RemotingCommand getSubscriptionGroupSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        try {
+            byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_SUBSCRIPTION_GROUP);
+            if (body != null) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setBody(body);
+            } else {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("No latest snapshot file found for subscription_group.");
+            }
+        } catch (Exception e) {
+            LOGGER.error("getSubscriptionGroupSnapshot error", e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+        }
+        return response;
+    }
+
+    private RemotingCommand getConsumerOffsetSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        try {
+            byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_CONSUMER_OFFSET);
+            if (body != null) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setBody(body);
+            } else {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("No latest snapshot file found for consumer_offset.");
+            }
+        } catch (Exception e) {
+            LOGGER.error("getConsumerOffsetSnapshot error", e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+        }
+        return response;
+    }
+
+
+    private byte[] readLatestSnapshotFile(String snapshotName) throws IOException {
+        String snapshotDir = brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "snapshot";
+        Path dirPath = Paths.get(snapshotDir);
+
+        if (Files.notExists(dirPath)) {
+            LOGGER.warn("Snapshot directory does not exist: {}", dirPath);
+            return null;
+        }
+        Path latestFile = Files.list(dirPath)
+                .filter(path -> path.getFileName().toString().startsWith(snapshotName + "_snapshot_"))
+                .max(Comparator.comparing(path -> path.getFileName().toString()))
+                .orElse(null);
+
+        if (latestFile != null) {
+            return Files.readAllBytes(latestFile);
+        } else {
+            LOGGER.warn("No snapshot file found for: {}", snapshotName);
+            return null;
+        }
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index bec75fe..08855b1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -116,13 +116,28 @@
     }
 
     private void updateOffset(int delayLevel, long offset) {
-        this.offsetTable.put(delayLevel, offset);
+        this.offsetTable.compute(delayLevel, (key, existingConfig) -> {
+            if (existingConfig == null) {
+                notifyDelayOffsetCreated(delayLevel, offset);
+            } else {
+                notifyDelayOffsetUpdated(delayLevel, offset);
+            }
+            return offset;
+        });
         if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getDelayOffsetUpdateVersionStep() == 0) {
             long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
             dataVersion.nextVersion(stateMachineVersion);
         }
     }
 
+    private void notifyDelayOffsetCreated(int delayLevel, long offset) {
+        brokerController.getMetadataChangeObserver().onCreated(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, String.valueOf(delayLevel), offset);
+    }
+
+    private void notifyDelayOffsetUpdated(int delayLevel, long offset) {
+        brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, String.valueOf(delayLevel), offset);
+    }
+
     public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
         Long time = this.delayLevelTable.get(delayLevel);
         if (time != null) {
@@ -268,6 +283,18 @@
         return true;
     }
 
+    public ConcurrentMap<Integer, Long> deepCopyDelayOffsetTable() {
+        ConcurrentMap<Integer, Long> newTable = new ConcurrentHashMap<>(this.offsetTable.size());
+        for (Map.Entry<Integer, Long> entry : this.offsetTable.entrySet()) {
+            Integer level = entry.getKey();
+            Long offset = entry.getValue();
+            if (offset != null) {
+                newTable.put(level, offset);
+            }
+        }
+        return newTable;
+    }
+
     @Override
     public String configFilePath() {
         return StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController.getMessageStore().getMessageStoreConfig()
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 2e31340..a371562 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -17,21 +17,37 @@
 package org.apache.rocketmq.broker.slave;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.TypeReference;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.sync.MetadataChangeInfo;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
@@ -39,6 +55,7 @@
 import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMetrics;
 
@@ -46,11 +63,335 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
     private volatile String masterAddr = null;
+    private boolean isIncrementSyncRunning = false;
+
+    private DefaultLitePullConsumer incrementSyncConsumer;
+
+    private long topicConfigSyncOffset = 0;
+    private long consumerOffsetSyncOffset = 0;
+    private long subscriptionGroupSyncOffset = 0;
+    private long delayOffsetSyncOffset = 0;
+    private long messageRequestModeSyncOffset = 0;
+    private long timerMetricsSyncOffset = 0;
+
 
     public SlaveSynchronize(BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
+    public void start() {
+        if (!isIncrementSyncRunning) {
+            try {
+                isIncrementSyncRunning = true;
+                initIncrementSyncConsumer();
+
+                Callable<Map<MessageQueue, Long>> snapshotSyncCallback = () -> {
+                    Map<MessageQueue, Long> messageQueueLongMap = null;
+                    try {
+                        messageQueueLongMap = syncAllMetadataSnapshots();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                    return messageQueueLongMap;
+                };
+                startIncrementalSync(snapshotSyncCallback);
+
+                LOGGER.info("Slave synchronize service started successfully.");
+            } catch (Exception e) {
+                LOGGER.error("Failed to start slave synchronize service", e);
+                isIncrementSyncRunning = false;
+            }
+        }
+    }
+
+    private void initIncrementSyncConsumer() throws MQClientException {
+        this.incrementSyncConsumer = new DefaultLitePullConsumer(MixAll.SLAVE_INCREMENT_SYNC_CONSUMER_GROUP);
+        this.incrementSyncConsumer.setNamesrvAddr(this.brokerController.getBrokerConfig().getNamesrvAddr());
+        this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC);
+        this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC);
+        this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC);
+        this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC);
+        this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC);
+
+        if (brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
+            this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC);
+        }
+    }
+
+    public void startIncrementalSync(Callable<Map<MessageQueue, Long>> syncSnapshotCallback) {
+        try {
+            this.incrementSyncConsumer.start();
+            setConsumerOffset(syncSnapshotCallback.call());
+            while (this.incrementSyncConsumer.isRunning()) {
+                List<MessageExt> messages = this.incrementSyncConsumer.poll(1000 * 3);
+                if (messages != null && !messages.isEmpty()) {
+                    for (MessageExt msg : messages) {
+                        try {
+                            handleMetadataMessage(msg);
+                        } catch (Exception e) {
+                            LOGGER.error("Failed to handle metadata message", e);
+                        }
+                    }
+                }
+                if (isLaggingTooFarBehind()) {
+                    LOGGER.info("Incremental sync consumer is lagging too far behind");
+                    setConsumerOffset(syncSnapshotCallback.call());
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("Failed to start incremental sync consumer", e);
+            throw new RuntimeException(e);
+        } finally {
+            this.incrementSyncConsumer.shutdown();
+        }
+    }
+
+    private void setConsumerOffset(Map<MessageQueue, Long> consumerOffsets) throws MQClientException {
+        for (Map.Entry<MessageQueue, Long> entry : consumerOffsets.entrySet()) {
+            MessageQueue queue = entry.getKey();
+            Long offset = entry.getValue();
+            this.incrementSyncConsumer.seek(queue, offset);
+        }
+    }
+
+    private boolean isLaggingTooFarBehind() {
+        final int threshold = brokerController.getBrokerConfig().getIncrementalSyncConsumerLagThreshold();
+        long topicConfigMaxOffset;
+        long consumerOffsetMaxOffset;
+        long subscriptionGroupMaxOffset;
+        long delayOffsetMaxOffset;
+        long messageRequestModeMaxOffset;
+        long timerMetricsMaxOffset;
+        try {
+            topicConfigMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC, 0);
+            consumerOffsetMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC, 0);
+            subscriptionGroupMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, 0);
+            delayOffsetMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, 0);
+            messageRequestModeMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, 0);
+            timerMetricsMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, 0);
+        } catch (ConsumeQueueException e) {
+            return true;
+        }
+        return (topicConfigMaxOffset - topicConfigSyncOffset) > threshold ||
+                (consumerOffsetMaxOffset - consumerOffsetSyncOffset) > threshold ||
+                (subscriptionGroupMaxOffset - subscriptionGroupSyncOffset) > threshold ||
+                (delayOffsetMaxOffset - delayOffsetSyncOffset) > threshold
+                || (messageRequestModeMaxOffset - messageRequestModeSyncOffset) > threshold
+                || (timerMetricsMaxOffset - timerMetricsSyncOffset) > threshold;
+    }
+
+    private void handleMetadataMessage(MessageExt msg) {
+        MetadataChangeInfo metadataChangeInfo = JSONObject.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), MetadataChangeInfo.class);
+        switch (msg.getTopic()) {
+            case TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC:
+                handleTopicConfigChangeInfo(metadataChangeInfo);
+                this.topicConfigSyncOffset = msg.getQueueOffset();
+                break;
+            case TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC:
+                handleConsumerOffsetChangeInfo(metadataChangeInfo);
+                this.consumerOffsetSyncOffset = msg.getQueueOffset();
+                break;
+            case TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC:
+                handleSubscriptionGroupChangeInfo(metadataChangeInfo);
+                this.subscriptionGroupSyncOffset = msg.getQueueOffset();
+                break;
+            case TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC:
+                handleDelayOffsetChangeInfo(metadataChangeInfo);
+                this.delayOffsetSyncOffset = msg.getQueueOffset();
+                break;
+            case TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC:
+                handleMessageModeChangeInfo(metadataChangeInfo);
+                this.messageRequestModeSyncOffset = msg.getQueueOffset();
+                break;
+            case TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC:
+                handleTimerMetricsChangeInfo(metadataChangeInfo);
+                this.timerMetricsSyncOffset = msg.getQueueOffset();
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void handleTopicConfigChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+        switch (metadataChangeInfo.getChangeType()) {
+            case CREATED:
+            case UPDATED:
+                this.brokerController.getTopicConfigManager().getTopicConfigTable().put(
+                        metadataChangeInfo.getMetadataKey(), JSON.parseObject(metadataChangeInfo.getMetadataValue(), TopicConfig.class));
+                break;
+            case DELETED:
+                this.brokerController.getTopicConfigManager().getTopicConfigTable().remove(metadataChangeInfo.getMetadataKey());
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    private void handleConsumerOffsetChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+        ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsets = JSONObject.parseObject(metadataChangeInfo.getMetadataValue(), new TypeReference<ConcurrentMap<String, ConcurrentMap<Integer, Long>>>() {
+        });
+        this.brokerController.getConsumerOffsetManager().getOffsetTable().putAll(offsets);
+    }
+
+    private void handleSubscriptionGroupChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+        switch (metadataChangeInfo.getChangeType()) {
+            case CREATED:
+            case UPDATED:
+                this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put(
+                        metadataChangeInfo.getMetadataKey(), JSON.parseObject(metadataChangeInfo.getMetadataValue(), SubscriptionGroupConfig.class));
+                break;
+            case DELETED:
+                this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().remove(metadataChangeInfo.getMetadataKey());
+                break;
+            default:
+                break;
+        }
+
+    }
+
+    private void handleDelayOffsetChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+        switch (metadataChangeInfo.getChangeType()) {
+            case CREATED:
+            case UPDATED:
+                this.brokerController.getScheduleMessageService().getOffsetTable().put(
+                        Integer.valueOf(metadataChangeInfo.getMetadataKey()), Long.parseLong(metadataChangeInfo.getMetadataValue()));
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void handleMessageModeChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+        switch (metadataChangeInfo.getChangeType()) {
+            case CREATED:
+            case UPDATED:
+                ConcurrentHashMap<String, SetMessageRequestModeRequestBody> requestModeMap = new ConcurrentHashMap<>();
+                LinkedHashMap<String, SetMessageRequestModeRequestBody> linkedMap = JSONObject.parseObject(metadataChangeInfo.getMetadataValue(),
+                        new TypeReference<LinkedHashMap<String, SetMessageRequestModeRequestBody>>() {
+                        });
+                if (linkedMap != null) {
+                    requestModeMap.putAll(linkedMap);
+                }
+                this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().getMessageRequestModeMap().put(
+                        metadataChangeInfo.getMetadataKey(), requestModeMap);
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void handleTimerMetricsChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+        switch (metadataChangeInfo.getChangeType()) {
+            case CREATED:
+            case UPDATED:
+                this.brokerController.getTimerMessageStore().getTimerMetrics().getTimingCount().put(
+                        metadataChangeInfo.getMetadataKey(), JSON.parseObject(metadataChangeInfo.getMetadataValue(), TimerMetrics.Metric.class));
+                break;
+            case DELETED:
+                break;
+            default:
+                break;
+        }
+    }
+
+
+    private Map<MessageQueue, Long> syncAllMetadataSnapshots() throws Exception {
+        LOGGER.info("Starting full metadata synchronization via snapshots...");
+        HashMap<MessageQueue, Long> result = new HashMap<>();
+
+
+        Triple<ConcurrentMap<String, TopicConfig>, DataVersion, Long> topicConfigSnapshot =
+                this.brokerController.getBrokerOuterAPI().getTopicConfigSnapShot(this.masterAddr);
+        if (topicConfigSnapshot != null) {
+            if (this.topicConfigSyncOffset < topicConfigSnapshot.getRight()) {
+                this.brokerController.getTopicConfigManager().setTopicConfigTable(topicConfigSnapshot.getLeft());
+                this.brokerController.getTopicConfigManager().persist();
+                this.topicConfigSyncOffset = topicConfigSnapshot.getRight();
+                result.put(new MessageQueue(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), topicConfigSyncOffset);
+            }
+            LOGGER.info("Topic config synced. Pull will start from offset: {}", consumerOffsetSyncOffset);
+        }
+
+        Triple<ConcurrentMap<String, ConcurrentMap<Integer, Long>>, DataVersion, Long> consumerOffsetSnapshot =
+                this.brokerController.getBrokerOuterAPI().getConsumerOffsetSnapShot(this.masterAddr);
+        if (consumerOffsetSnapshot != null) {
+            if (this.consumerOffsetSyncOffset < consumerOffsetSnapshot.getRight()) {
+                this.brokerController.getConsumerOffsetManager().setOffsetTable(consumerOffsetSnapshot.getLeft());
+                this.brokerController.getConsumerOffsetManager().persist();
+                this.consumerOffsetSyncOffset = consumerOffsetSnapshot.getRight();
+                result.put(new MessageQueue(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), consumerOffsetSyncOffset);
+            }
+            this.brokerController.getConsumerOffsetManager().setDataVersion(consumerOffsetSnapshot.getMiddle());
+            LOGGER.info("Consumer offset synced. Pull will start from offset: {}", consumerOffsetSyncOffset);
+        }
+
+        Triple<ConcurrentMap<String, SubscriptionGroupConfig>, DataVersion, Long> subscriptionGroupSnapshot =
+                this.brokerController.getBrokerOuterAPI().getSubscriptionGroupSnapShot(this.masterAddr);
+        if (subscriptionGroupSnapshot != null) {
+            this.brokerController.getSubscriptionGroupManager().setDataVersion(subscriptionGroupSnapshot.getMiddle());
+            if (this.subscriptionGroupSyncOffset < subscriptionGroupSnapshot.getRight()) {
+                this.brokerController.getSubscriptionGroupManager().setSubscriptionGroupTable(subscriptionGroupSnapshot.getLeft());
+                this.brokerController.getSubscriptionGroupManager().persist();
+                this.subscriptionGroupSyncOffset = subscriptionGroupSnapshot.getRight();
+                result.put(new MessageQueue(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), subscriptionGroupSyncOffset);
+            }
+            LOGGER.info("Subscription group synced. Pull will start from offset: {}", subscriptionGroupSyncOffset);
+        }
+
+        Triple<ConcurrentMap<String, Long>,DataVersion,Long> delayOffsetSnapshot =
+                this.brokerController.getBrokerOuterAPI().getDelayOffsetSnapShot(this.masterAddr);
+        if (delayOffsetSnapshot != null) {
+            Map<String, Object> jsonOutput = new LinkedHashMap<>();
+            jsonOutput.put("dataVersion", delayOffsetSnapshot.getMiddle());
+            jsonOutput.put("offsetTable", delayOffsetSnapshot.getLeft());
+            String jsonString = JSON.toJSONString(jsonOutput);
+            MixAll.string2File(jsonString, StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()));
+            this.brokerController.getScheduleMessageService().loadWhenSyncDelayOffset();
+            if (this.delayOffsetSyncOffset < delayOffsetSnapshot.getRight()) {
+                this.delayOffsetSyncOffset = delayOffsetSnapshot.getRight();
+                result.put(new MessageQueue(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), delayOffsetSyncOffset);
+            }
+            LOGGER.info("Delay offset synced. Pull will start from offset: {}", delayOffsetSyncOffset);
+        }
+
+        Pair<ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>>, Long> messageModeSnapshot =
+                this.brokerController.getBrokerOuterAPI().getSetMessageRequestModeSnapShot(this.masterAddr);
+        if (messageModeSnapshot != null) {
+
+            if (this.messageRequestModeSyncOffset < messageModeSnapshot.getObject2()) {
+                this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().setMessageRequestModeMap(messageModeSnapshot.getObject1());
+                this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().persist();
+                this.messageRequestModeSyncOffset = messageModeSnapshot.getObject2();
+                result.put(new MessageQueue(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), messageRequestModeSyncOffset);
+            }
+            LOGGER.info("Message request mode synced. Pull will start from offset: {}", messageRequestModeSyncOffset);
+        }
+        LOGGER.info("Full metadata synchronization completed.");
+
+        if (brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
+            Triple<ConcurrentMap<String, TimerMetrics.Metric>, DataVersion, Long> timerMetricsSnapshot =
+                    this.brokerController.getBrokerOuterAPI().getTopicMetricsSnapShot(this.masterAddr);
+            if (timerMetricsSnapshot != null) {
+                if (null != brokerController.getMessageStore().getTimerMessageStore()) {
+                    this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getDataVersion().assignNewOne(timerMetricsSnapshot.getMiddle());
+                    this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getTimingCount().clear();
+                    this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getTimingCount().putAll(timerMetricsSnapshot.getLeft());
+                    this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().persist();
+                    if (this.timerMetricsSyncOffset < timerMetricsSnapshot.getRight()) {
+                        this.timerMetricsSyncOffset = timerMetricsSnapshot.getRight();
+                        result.put(new MessageQueue(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), timerMetricsSyncOffset);
+                    }
+                    this.timerMetricsSyncOffset = timerMetricsSnapshot.getRight();
+                }
+            }
+        }
+        return result;
+    }
+
+
+
+
     public String getMasterAddr() {
         return masterAddr;
     }
@@ -63,18 +404,19 @@
     }
 
     public void syncAll() {
-        this.syncTopicConfig();
-        this.syncConsumerOffset();
-        this.syncDelayOffset();
-        this.syncSubscriptionGroupConfig();
-        this.syncMessageRequestMode();
+        this.syncTopicConfigFull();
+        this.syncConsumerOffsetFull();
+        this.syncDelayOffsetFull();
+        this.syncSubscriptionGroupConfigFull();
+        this.syncMessageRequestModeFull();
 
         if (brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
-            this.syncTimerMetrics();
+            this.syncTimerMetricsFull();
         }
     }
 
-    private void syncTopicConfig() {
+
+    private void syncTopicConfigFull() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
@@ -117,14 +459,15 @@
 
                     this.brokerController.getTopicQueueMappingManager().persist();
                 }
-                LOGGER.info("Update slave topic config from master, {}", masterAddrBak);
+                LOGGER.info("Full sync slave topic config from master, {}", masterAddrBak);
             } catch (Exception e) {
                 LOGGER.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
             }
         }
     }
 
-    private void syncConsumerOffset() {
+
+    private void syncConsumerOffsetFull() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
@@ -134,14 +477,14 @@
                         .putAll(offsetWrapper.getOffsetTable());
                 this.brokerController.getConsumerOffsetManager().getDataVersion().assignNewOne(offsetWrapper.getDataVersion());
                 this.brokerController.getConsumerOffsetManager().persist();
-                LOGGER.info("Update slave consumer offset from master, {}", masterAddrBak);
+                LOGGER.info("Full sync slave consumer offset from master, {}", masterAddrBak);
             } catch (Exception e) {
                 LOGGER.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
             }
         }
     }
 
-    private void syncDelayOffset() {
+    private void syncDelayOffsetFull() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
@@ -159,14 +502,14 @@
                         LOGGER.error("Persist file Exception, {}", fileName, e);
                     }
                 }
-                LOGGER.info("Update slave delay offset from master, {}", masterAddrBak);
+                LOGGER.info("Full sync slave delay offset from master, {}", masterAddrBak);
             } catch (Exception e) {
                 LOGGER.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
             }
         }
     }
 
-    private void syncSubscriptionGroupConfig() {
+    private void syncSubscriptionGroupConfigFull() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
@@ -197,7 +540,7 @@
                     subscriptionGroupManager.updateDataVersion();
                     // persist
                     subscriptionGroupManager.persist();
-                    LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak);
+                    LOGGER.info("Full sync slave Subscription Group from master, {}", masterAddrBak);
                 }
             } catch (Exception e) {
                 LOGGER.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
@@ -205,7 +548,7 @@
         }
     }
 
-    private void syncMessageRequestMode() {
+    private void syncMessageRequestModeFull() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
@@ -225,7 +568,7 @@
                 curMessageRequestModeMap.putAll(newMessageRequestModeMap);
                 // persist
                 messageRequestModeManager.persist();
-                LOGGER.info("Update slave Message Request Mode from master, {}", masterAddrBak);
+                LOGGER.info("Full sync slave Message Request Mode from master, {}", masterAddrBak);
             } catch (Exception e) {
                 LOGGER.error("SyncMessageRequestMode Exception, {}", masterAddrBak, e);
             }
@@ -251,7 +594,7 @@
         }
     }
 
-    private void syncTimerMetrics() {
+    private void syncTimerMetricsFull() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null) {
             try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index e860f29..c9901a7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -77,63 +77,82 @@
         {
             SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
             subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
-            putSubscriptionGroupConfig(subscriptionGroupConfig);
+            this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
         }
 
         {
             SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
             subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
-            putSubscriptionGroupConfig(subscriptionGroupConfig);
+            this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
         }
 
         {
             SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
             subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
-            putSubscriptionGroupConfig(subscriptionGroupConfig);
+            this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
         }
 
         {
             SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
             subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
             subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            putSubscriptionGroupConfig(subscriptionGroupConfig);
+            this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
         }
 
         {
             SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
             subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
             subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            putSubscriptionGroupConfig(subscriptionGroupConfig);
+            this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
         }
 
         {
             SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
             subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
             subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            putSubscriptionGroupConfig(subscriptionGroupConfig);
+            this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
         }
 
         {
             SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
             subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
             subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            putSubscriptionGroupConfig(subscriptionGroupConfig);
+            this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
         }
 
         {
             SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
             subscriptionGroupConfig.setGroupName(MixAll.CID_SYS_RMQ_TRANS);
             subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            putSubscriptionGroupConfig(subscriptionGroupConfig);
+            this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
         }
     }
 
     public SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
-        return this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
+        return this.subscriptionGroupTable.compute(subscriptionGroupConfig.getGroupName(), (key, existingConfig) -> {
+            if (existingConfig == null) {
+                notifySubscriptionGroupCreated(subscriptionGroupConfig);
+            } else {
+                notifySubscriptionGroupUpdated(subscriptionGroupConfig);
+            }
+            return subscriptionGroupConfig;
+        });
     }
 
     protected SubscriptionGroupConfig putSubscriptionGroupConfigIfAbsent(SubscriptionGroupConfig subscriptionGroupConfig) {
-        return this.subscriptionGroupTable.putIfAbsent(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
+        final SubscriptionGroupConfig[] result = new SubscriptionGroupConfig[1];
+        this.subscriptionGroupTable.compute(subscriptionGroupConfig.getGroupName(), (key, existingConfig) -> {
+            if (existingConfig == null) {
+                notifySubscriptionGroupCreated(subscriptionGroupConfig);
+                result[0] = null;
+                return subscriptionGroupConfig;
+            } else {
+                notifySubscriptionGroupUpdated(subscriptionGroupConfig);
+                result[0] = existingConfig;
+                return existingConfig;
+            }
+        });
+        return result[0];
     }
 
     protected SubscriptionGroupConfig getSubscriptionGroupConfig(String groupName) {
@@ -141,13 +160,52 @@
     }
 
     protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) {
-        return this.subscriptionGroupTable.remove(groupName);
+        SubscriptionGroupConfig[] removedConfig = new SubscriptionGroupConfig[1];
+        this.subscriptionGroupTable.compute(groupName, (key, existingConfig) -> {
+            if (existingConfig != null) {
+                removedConfig[0] = existingConfig;
+                notifySubscriptionGroupDeleted(removedConfig[0]);
+            }
+            return null;
+        });
+        return removedConfig[0];
+    }
+
+    private void notifySubscriptionGroupCreated(SubscriptionGroupConfig config) {
+        brokerController.getMetadataChangeObserver().onCreated(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, config.getGroupName(), config);
+    }
+
+    private void notifySubscriptionGroupUpdated(SubscriptionGroupConfig config) {
+        brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, config.getGroupName(), config);
+    }
+
+    private void notifySubscriptionGroupDeleted(SubscriptionGroupConfig config) {
+        brokerController.getMetadataChangeObserver().onDeleted(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, config.getGroupName(), config);
     }
 
     public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
         updateSubscriptionGroupConfigWithoutPersist(config);
         this.persist();
     }
+    public ConcurrentMap<String, SubscriptionGroupConfig> deepCopySubscriptionGroupTable() {
+        ConcurrentMap<String, SubscriptionGroupConfig> newTable =
+                new ConcurrentHashMap<>(this.subscriptionGroupTable.size());
+
+        for (Map.Entry<String, SubscriptionGroupConfig> entry : this.subscriptionGroupTable.entrySet()) {
+            String groupName = entry.getKey();
+            SubscriptionGroupConfig originalConfig = entry.getValue();
+
+            if (originalConfig != null) {
+                try {
+                    SubscriptionGroupConfig clonedConfig = originalConfig.clone();
+                    newTable.put(groupName, clonedConfig);
+                } catch (CloneNotSupportedException e) {
+                    newTable.put(groupName, originalConfig);
+                }
+            }
+        }
+        return newTable;
+    }
 
     public void updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
         Map<String, String> newAttributes = request(config);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMessageProducer.java b/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMessageProducer.java
new file mode 100644
index 0000000..05fd8c5
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMessageProducer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.sync;
+
+import com.alibaba.fastjson2.JSON;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.sync.MetadataChangeInfo;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+
+public class SyncMessageProducer {
+    protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final BrokerController brokerController;
+    private final int syncQueueId;
+
+    public SyncMessageProducer(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.syncQueueId = 0;
+    }
+
+    public void sendMetadataChange(final String targetTopic, MetadataChangeInfo changeInfo) {
+        try {
+            byte[] body = JSON.toJSONBytes(changeInfo);
+            MessageExtBrokerInner msg = new MessageExtBrokerInner();
+            msg.setBrokerName(brokerController.getBrokerConfig().getBrokerName());
+            msg.setTopic(targetTopic);
+            msg.setBody(body);
+            msg.setKeys(changeInfo.getMetadataKey());
+            msg.setTags(changeInfo.getMetadataKey());
+            msg.setBornTimestamp(System.currentTimeMillis());
+            msg.setStoreHost(brokerController.getStoreHost());
+            msg.setBornHost(brokerController.getStoreHost());
+            msg.setQueueId(this.syncQueueId);
+            msg.setMsgId(MessageClientIDSetter.createUniqID());
+            PutMessageResult result = this.brokerController.getMessageStore().putMessage(msg);
+            if (result.getPutMessageStatus() != PutMessageStatus.PUT_OK) {
+                throw new RuntimeException("send metadata change failed, topic: " + targetTopic + ", msgId: " + msg.getMsgId() + ", status: " + result.getPutMessageStatus().name());
+            }
+        } catch (Exception e) {
+            LOG.error("send metadata change failed, topic: " + targetTopic, e);
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMetadataChangeObserver.java b/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMetadataChangeObserver.java
new file mode 100644
index 0000000..9b5501e
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMetadataChangeObserver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.sync;
+
+import com.alibaba.fastjson2.JSON;
+import org.apache.rocketmq.common.sync.MetadataChangeInfo;
+import org.apache.rocketmq.common.sync.MetadataChangeObserver;
+
+public class SyncMetadataChangeObserver implements MetadataChangeObserver {
+
+
+    private final SyncMessageProducer producer;
+
+    public SyncMetadataChangeObserver(SyncMessageProducer producer) {
+        this.producer = producer;
+    }
+
+    @Override
+    public void onCreated(String targetTopic,String metadataKey, Object newMetadata) {
+        this.producer.sendMetadataChange(targetTopic, MetadataChangeInfo.created(
+            metadataKey,
+            JSON.toJSONString(newMetadata)
+        ));
+    }
+
+    @Override
+    public void onUpdated(String targetTopic, String metadataKey, Object newMetadata) {
+        MetadataChangeInfo changeInfo = MetadataChangeInfo.updated(
+                metadataKey,
+            JSON.toJSONString(newMetadata)
+        );
+        this.producer.sendMetadataChange(targetTopic, changeInfo);
+    }
+
+    @Override
+    public void onDeleted(String targetTopic,String metadataKey, Object oldMetadata) {
+        this.producer.sendMetadataChange(targetTopic, MetadataChangeInfo.deleted(JSON.toJSONString(oldMetadata)));
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index ed46dfd..5841ecb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -92,7 +92,7 @@
             TopicValidator.addSystemTopic(topic);
             topicConfig.setReadQueueNums(1);
             topicConfig.setWriteQueueNums(1);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
         {
             if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
@@ -105,7 +105,7 @@
                     .getDefaultTopicQueueNums());
                 int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                 topicConfig.setPerm(perm);
-                putTopicConfig(topicConfig);
+                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
             }
         }
         {
@@ -114,7 +114,7 @@
             TopicValidator.addSystemTopic(topic);
             topicConfig.setReadQueueNums(1024);
             topicConfig.setWriteQueueNums(1024);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
         {
             String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
@@ -125,7 +125,7 @@
                 perm |= PermName.PERM_READ | PermName.PERM_WRITE;
             }
             topicConfig.setPerm(perm);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
         {
 
@@ -139,7 +139,7 @@
             topicConfig.setReadQueueNums(1);
             topicConfig.setWriteQueueNums(1);
             topicConfig.setPerm(perm);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
         {
             String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT;
@@ -147,7 +147,7 @@
             TopicValidator.addSystemTopic(topic);
             topicConfig.setReadQueueNums(1);
             topicConfig.setWriteQueueNums(1);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
         {
             String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
@@ -155,7 +155,7 @@
             TopicValidator.addSystemTopic(topic);
             topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
             topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
         {
             if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
@@ -164,7 +164,7 @@
                 TopicValidator.addSystemTopic(topic);
                 topicConfig.setReadQueueNums(1);
                 topicConfig.setWriteQueueNums(1);
-                putTopicConfig(topicConfig);
+                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
             }
         }
         {
@@ -173,7 +173,7 @@
             TopicValidator.addSystemTopic(topic);
             topicConfig.setReadQueueNums(1);
             topicConfig.setWriteQueueNums(1);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
         {
             // PopAckConstants.REVIVE_TOPIC
@@ -182,7 +182,7 @@
             TopicValidator.addSystemTopic(topic);
             topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getReviveQueueNum());
             topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getReviveQueueNum());
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
         {
             // sync broker member group topic
@@ -192,7 +192,7 @@
             topicConfig.setReadQueueNums(1);
             topicConfig.setWriteQueueNums(1);
             topicConfig.setPerm(PermName.PERM_INHERIT);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
         {
             // TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC
@@ -201,7 +201,7 @@
             TopicValidator.addSystemTopic(topic);
             topicConfig.setReadQueueNums(1);
             topicConfig.setWriteQueueNums(1);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
 
         {
@@ -211,9 +211,62 @@
             TopicValidator.addSystemTopic(topic);
             topicConfig.setReadQueueNums(1);
             topicConfig.setWriteQueueNums(1);
-            putTopicConfig(topicConfig);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
         }
-
+        {
+            // TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC
+            String topic = TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            TopicValidator.addSystemTopic(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+            // TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC
+            String topic = TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            TopicValidator.addSystemTopic(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+            // TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC
+            String topic = TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            TopicValidator.addSystemTopic(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+            // TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC
+            String topic = TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            TopicValidator.addSystemTopic(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+            // TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC
+            String topic = TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            TopicValidator.addSystemTopic(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+            // TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC
+            String topic = TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            TopicValidator.addSystemTopic(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
         {
             if (this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
                 String topic = TimerMessageStore.TIMER_TOPIC;
@@ -227,15 +280,43 @@
     }
 
     public TopicConfig putTopicConfig(TopicConfig topicConfig) {
-        return this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        return this.topicConfigTable.compute(topicConfig.getTopicName(), (key, existingConfig) -> {
+            if (existingConfig == null) {
+                notifyTopicCreated(topicConfig);
+            } else {
+                notifyTopicUpdated(topicConfig);
+            }
+            return topicConfig;
+        });
     }
 
+    private void notifyTopicCreated(TopicConfig topicConfig) {
+        brokerController.getMetadataChangeObserver().onCreated(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC,topicConfig.getTopicName(), topicConfig);
+    }
+
+    private void notifyTopicUpdated(TopicConfig topicConfig) {
+        brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC,topicConfig.getTopicName(), topicConfig);
+    }
+
+    private void notifyTopicDeleted(TopicConfig topicConfig) {
+        brokerController.getMetadataChangeObserver().onDeleted(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC,topicConfig.getTopicName(), topicConfig);
+    }
+
+
     protected TopicConfig getTopicConfig(String topicName) {
         return this.topicConfigTable.get(topicName);
     }
 
     protected TopicConfig removeTopicConfig(String topicName) {
-        return this.topicConfigTable.remove(topicName);
+        TopicConfig[] topicConfig = new TopicConfig[1];
+        this.topicConfigTable.compute(topicName, (key, existingConfig) -> {
+            if (existingConfig != null) {
+                topicConfig[0] = existingConfig;
+                notifyTopicDeleted(existingConfig);
+            }
+            return null;
+        });
+        return topicConfig[0];
     }
 
     public TopicConfig selectTopicConfig(final String topic) {
@@ -759,5 +840,21 @@
     }
 
 
-
+    public ConcurrentMap<String, TopicConfig> deepCopyTopicConfigTable() {
+        ConcurrentMap<String, TopicConfig> newTable =
+                new ConcurrentHashMap<>(this.topicConfigTable.size());
+        for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
+            String topicName = entry.getKey();
+            TopicConfig originalConfig = entry.getValue();
+            if (originalConfig != null) {
+                try {
+                    TopicConfig clonedConfig = originalConfig.clone();
+                    newTable.put(topicName, clonedConfig);
+                } catch (CloneNotSupportedException e) {
+                    newTable.put(topicName, originalConfig);
+                }
+            }
+        }
+        return newTable;
+    }
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
index 4ff8a81..364d5eb 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
@@ -20,6 +20,7 @@
 import java.io.File;
 import java.io.IOException;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
 import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
@@ -52,6 +53,9 @@
     @Mock
     private MessageStore messageStore;
 
+    @Mock
+    private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
     @Rule
     public TemporaryFolder tf = new TemporaryFolder();
 
@@ -67,7 +71,7 @@
         BrokerConfig brokerConfig = new BrokerConfig();
         brokerConfig.setAutoCreateSubscriptionGroup(false);
         Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
-
+        Mockito.doReturn(syncMetadataChangeObserver).when(controller).getMetadataChangeObserver();
         Mockito.doReturn(messageStore).when(controller).getMessageStore();
         Mockito.doReturn(1L).when(messageStore).getStateMachineVersion();
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
index 731a1f5..d07f34b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.store.MessageStore;
@@ -52,6 +53,9 @@
     @Rule
     public TemporaryFolder tf = new TemporaryFolder();
 
+    @Mock
+    private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
     @After
     public void cleanUp() {
         if (null != configStorage) {
@@ -67,7 +71,7 @@
         messageStoreConfig = new MessageStoreConfig();
         Mockito.doReturn(messageStoreConfig).when(controller).getMessageStoreConfig();
         Mockito.doReturn(messageStore).when(controller).getMessageStore();
-
+        Mockito.doReturn(syncMetadataChangeObserver).when(controller).getMetadataChangeObserver();
         File configStoreDir = tf.newFolder();
         messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
index 4fbec13..c89f2b9 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -19,6 +19,7 @@
 
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
@@ -59,6 +60,9 @@
     @Mock
     private DefaultMessageStore defaultMessageStore;
 
+    @Mock
+    private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
     @Before
     public void init() {
         if (notToBeExecuted()) {
@@ -66,6 +70,7 @@
         }
         BrokerConfig brokerConfig = new BrokerConfig();
         Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        Mockito.doReturn(syncMetadataChangeObserver).when(brokerController).getMetadataChangeObserver();
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setStorePathRootDir(basePath);
         Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
index 3c975a5..a0a3467 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.SubscriptionGroupAttributes;
 import org.apache.rocketmq.common.attribute.BooleanAttribute;
@@ -60,6 +61,9 @@
     private BrokerController brokerControllerMock;
     private SubscriptionGroupManager subscriptionGroupManager;
 
+    @Mock
+    private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
     @Before
     public void before() {
         SubscriptionGroupAttributes.ALL.put("test", new BooleanAttribute(
@@ -71,6 +75,9 @@
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setStorePathRootDir(basePath);
         Mockito.lenient().when(brokerControllerMock.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        if (!notToBeExecuted()) {
+            Mockito.doReturn(syncMetadataChangeObserver).when(brokerControllerMock).getMetadataChangeObserver();
+        }
     }
 
     @After
@@ -238,4 +245,4 @@
         }
     }
 
-}
\ No newline at end of file
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
index fa3ef95..598f871 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
@@ -34,6 +34,7 @@
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.attribute.EnumAttribute;
 import org.apache.rocketmq.common.attribute.LongRangeAttribute;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
 import org.apache.rocketmq.common.utils.QueueTypeUtils;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -61,6 +62,9 @@
     private BrokerController brokerController;
 
     @Mock
+    private NoopMetadataChangeObserver noopMetadataChangeObserver;
+
+    @Mock
     private DefaultMessageStore defaultMessageStore;
 
     @Before
@@ -76,6 +80,7 @@
         Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
         Mockito.lenient().when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
         topicConfigManager = new RocksDBTopicConfigManager(brokerController);
+        Mockito.doReturn(noopMetadataChangeObserver).when(brokerController).getMetadataChangeObserver();
         topicConfigManager.load();
     }
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
index e925ed4..2fe205d 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
@@ -19,6 +19,7 @@
 
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -60,6 +61,9 @@
     @Mock
     private DefaultMessageStore defaultMessageStore;
 
+    @Mock
+    private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
     @Before
     public void init() {
         if (notToBeExecuted()) {
@@ -72,6 +76,7 @@
         Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
         when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
         when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
+        Mockito.doReturn(syncMetadataChangeObserver).when(brokerController).getMetadataChangeObserver();
     }
 
     @After
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index 5b2ea0b..7ebfec1 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
@@ -64,6 +65,9 @@
     @Mock
     private DefaultMessageStore defaultMessageStore;
 
+    @Mock
+    private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
     @Before
     public void init() {
         BrokerConfig brokerConfig = new BrokerConfig();
@@ -74,6 +78,7 @@
         Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
         when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
         topicConfigManager = new TopicConfigManager(brokerController);
+        Mockito.doReturn(syncMetadataChangeObserver).when(brokerController).getMetadataChangeObserver();
     }
 
     @Test
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 04828da..6d83154 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -411,7 +411,7 @@
     private boolean usePIDColdCtrStrategy = true;
     private long cgColdReadThreshold = 3 * 1024 * 1024;
     private long globalColdReadThreshold = 100 * 1024 * 1024;
-    
+
     /**
      * The interval to fetch namesrv addr, default value is 10 second
      */
@@ -479,6 +479,38 @@
 
     private boolean enableCreateSysGroup = true;
 
+    private boolean allowMetadataIncrementalSync = false;
+
+    private int snapshotIntervalSeconds = 600;
+
+    private int metadataIncrementalSyncThreadPoolNums = 1;
+
+    private int incrementalSyncConsumerLagThreshold = 50;
+
+    public int getMetadataIncrementalSyncThreadPoolNums() {
+        return metadataIncrementalSyncThreadPoolNums;
+    }
+
+    public void setMetadataIncrementalSyncThreadPoolNums(int metadataIncrementalSyncThreadPoolNums) {
+        this.metadataIncrementalSyncThreadPoolNums = metadataIncrementalSyncThreadPoolNums;
+    }
+
+    public int getSnapshotIntervalSeconds() {
+        return snapshotIntervalSeconds;
+    }
+
+    public void setSnapshotIntervalSeconds(int snapshotIntervalSeconds) {
+        this.snapshotIntervalSeconds = snapshotIntervalSeconds;
+    }
+
+    public boolean isAllowMetadataIncrementalSync() {
+        return allowMetadataIncrementalSync;
+    }
+
+    public void setAllowMetadataIncrementalSync(boolean allowMetadataIncrementalSync) {
+        this.allowMetadataIncrementalSync = allowMetadataIncrementalSync;
+    }
+
     public String getConfigBlackList() {
         return configBlackList;
     }
@@ -1963,11 +1995,11 @@
     public void setUseStaticSubscription(boolean useStaticSubscription) {
         this.useStaticSubscription = useStaticSubscription;
     }
-    
+
     public long getFetchNamesrvAddrInterval() {
         return fetchNamesrvAddrInterval;
     }
-    
+
     public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) {
         this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
     }
@@ -2163,4 +2195,12 @@
     public void setSplitMetadataSize(int splitMetadataSize) {
         this.splitMetadataSize = splitMetadataSize;
     }
+
+    public int getIncrementalSyncConsumerLagThreshold() {
+        return incrementalSyncConsumerLagThreshold;
+    }
+
+    public void setIncrementalSyncConsumerLagThreshold(int incrementalSyncConsumerLagThreshold) {
+        this.incrementalSyncConsumerLagThreshold = incrementalSyncConsumerLagThreshold;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 00006ac..5f37fec 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -120,13 +120,27 @@
     public static final String ZONE_MODE = "__ZONE_MODE";
     public final static String RPC_REQUEST_HEADER_NAMESPACED_FIELD = "nsd";
     public final static String RPC_REQUEST_HEADER_NAMESPACE_FIELD = "ns";
+    public static final String SLAVE_INCREMENT_SYNC_CONSUMER_GROUP = "slave_sync_consumer_group";
 
     private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
     public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
     public static final String METADATA_SCOPE_GLOBAL = "__global__";
     public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__syslo__none__";
     public static final String MULTI_PATH_SPLITTER = System.getProperty("rocketmq.broker.multiPathSplitter", ",");
-
+    public static final String SNAPSHOT_NAME_TOPIC_CONFIG = "topic_config";
+    public static final String SNAPSHOT_NAME_SUBSCRIPTION_GROUP = "subscription_group";
+    public static final String SNAPSHOT_NAME_CONSUMER_OFFSET = "consumer_offset";
+    public static final String SNAPSHOT_NAME_DELAY_OFFSET = "delay_offset";
+    public static final String SNAPSHOT_NAME_MESSAGE_MODE = "message_mode";
+    public static final String SNAPSHOT_NAME_TIMER_METRICS = "timer_metrics";
+    public static final Set<String> SNAPSHOT_NAMES = ImmutableSet.of(
+            SNAPSHOT_NAME_TOPIC_CONFIG,
+            SNAPSHOT_NAME_SUBSCRIPTION_GROUP,
+            SNAPSHOT_NAME_CONSUMER_OFFSET,
+            SNAPSHOT_NAME_DELAY_OFFSET,
+            SNAPSHOT_NAME_MESSAGE_MODE,
+            SNAPSHOT_NAME_TIMER_METRICS
+    );
     private static final String OS = System.getProperty("os.name").toLowerCase();
 
     private static final Set<String> PREDEFINE_GROUP_SET = ImmutableSet.of(
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
index 0bf6490..586a756 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -27,7 +27,7 @@
 
 import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE;
 
-public class TopicConfig {
+public class TopicConfig implements Cloneable {
     private static final String SEPARATOR = " ";
     public static int defaultReadQueueNums = 16;
     public static int defaultWriteQueueNums = 16;
@@ -270,4 +270,13 @@
             + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" + order
             + ", attributes=" + attributes + "]";
     }
+
+    @Override
+    public TopicConfig clone() throws CloneNotSupportedException {
+        TopicConfig clone = (TopicConfig) super.clone();
+        if (this.attributes != null) {
+            clone.setAttributes(new HashMap<>(this.attributes));
+        }
+        return clone;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeInfo.java b/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeInfo.java
new file mode 100644
index 0000000..0c56039
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sync;
+
+public class MetadataChangeInfo {
+
+    private String metadataKey;
+
+    private String metadataValue;
+
+    private long timestamp;
+
+    private ChangeType changeType;
+
+
+    public enum ChangeType {
+        CREATED,
+        UPDATED,
+        DELETED
+    }
+
+    public static MetadataChangeInfo created(String metadataKey, String metadataValue) {
+        if (metadataKey == null || metadataValue == null) {
+            throw new IllegalArgumentException("Metadata key and value cannot be null");
+        }
+        MetadataChangeInfo metadataChangeInfo = new MetadataChangeInfo();
+        metadataChangeInfo.setMetadataKey(metadataKey);
+        metadataChangeInfo.setMetadataValue(metadataValue);
+        metadataChangeInfo.setChangeType(ChangeType.CREATED);
+        metadataChangeInfo.setTimestamp(System.currentTimeMillis());
+        return metadataChangeInfo;
+    }
+
+
+    public static MetadataChangeInfo updated(String metadataKey, String metadataValue) {
+        if (metadataKey == null || metadataValue == null) {
+            throw new IllegalArgumentException("Metadata key and value cannot be null");
+        }
+        MetadataChangeInfo metadataChangeInfo = new MetadataChangeInfo();
+        metadataChangeInfo.setMetadataKey(metadataKey);
+        metadataChangeInfo.setMetadataValue(metadataValue);
+        metadataChangeInfo.setChangeType(ChangeType.UPDATED);
+        metadataChangeInfo.setTimestamp(System.currentTimeMillis());
+        return metadataChangeInfo;
+    }
+
+    public static MetadataChangeInfo deleted(String metadataKey) {
+        if (metadataKey == null) {
+            throw new IllegalArgumentException("Metadata key cannot be null");
+        }
+        MetadataChangeInfo metadataChangeInfo = new MetadataChangeInfo();
+        metadataChangeInfo.setMetadataKey(metadataKey);
+        metadataChangeInfo.setChangeType(ChangeType.DELETED);
+        metadataChangeInfo.setTimestamp(System.currentTimeMillis());
+        return metadataChangeInfo;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public ChangeType getChangeType() {
+        return changeType;
+    }
+
+    public void setChangeType(ChangeType changeType) {
+        this.changeType = changeType;
+    }
+
+    public String getMetadataKey() {
+        return metadataKey;
+    }
+
+    public void setMetadataKey(String metadataKey) {
+        this.metadataKey = metadataKey;
+    }
+
+    public String getMetadataValue() {
+        return metadataValue;
+    }
+
+    public void setMetadataValue(String metadataValue) {
+        this.metadataValue = metadataValue;
+    }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeObserver.java b/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeObserver.java
new file mode 100644
index 0000000..a1ddd62
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeObserver.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sync;
+
+public interface MetadataChangeObserver {
+
+    void onCreated(String targetTopic,String metadataKey, Object newMetadata);
+
+    void onUpdated(String targetTopic,String metadataKey, Object newMetadata);
+
+    void onDeleted(String targetTopic,String metadataKey, Object oldMetadata);
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/sync/NoopMetadataChangeObserver.java b/common/src/main/java/org/apache/rocketmq/common/sync/NoopMetadataChangeObserver.java
new file mode 100644
index 0000000..4240b10
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sync/NoopMetadataChangeObserver.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sync;
+
+public class NoopMetadataChangeObserver implements MetadataChangeObserver {
+
+
+    @Override
+    public void onCreated(String targetTopic, String metadataKey, Object newMetadata) {
+
+    }
+
+    @Override
+    public void onUpdated(String targetTopic, String metadataKey, Object newMetadata) {
+
+    }
+
+    @Override
+    public void onDeleted(String targetTopic, String metadataKey, Object oldMetadata) {
+
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
index 47d45c6..05d77ee 100644
--- a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
@@ -33,7 +33,12 @@
     public static final String RMQ_SYS_SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
     public static final String RMQ_SYS_OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
     public static final String RMQ_SYS_ROCKSDB_OFFSET_TOPIC = "CHECKPOINT_TOPIC";
-
+    public static final String RMQ_SYS_TOPIC_CONFIG_SYNC = "RMQ_SYS_TOPIC_CONFIG_SYNC";
+    public static final String RMQ_SYS_CONSUMER_OFFSET_SYNC = "RMQ_SYS_CONSUMER_OFFSET_SYNC";
+    public static final String RMQ_SYS_DELAY_OFFSET_SYNC = "RMQ_SYS_DELAY_OFFSET_SYNC";
+    public static final String RMQ_SYS_SUBSCRIPTION_GROUP_SYNC = "RMQ_SYS_SUBSCRIPTION_GROUP_SYNC";
+    public static final String RMQ_SYS_MESSAGE_MODE_SYNC = "RMQ_SYS_MESSAGE_MODE_SYNC";
+    public static final String RMQ_SYS_TIMER_METRICS_SYNC = "RMQ_SYS_TIMER_METRICS_SYNC";
     public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
     public static final String SYNC_BROKER_MEMBER_GROUP_PREFIX = SYSTEM_TOPIC_PREFIX + "SYNC_BROKER_MEMBER_";
 
@@ -64,6 +69,13 @@
         SYSTEM_TOPIC_SET.add(RMQ_SYS_SELF_TEST_TOPIC);
         SYSTEM_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT);
         SYSTEM_TOPIC_SET.add(RMQ_SYS_ROCKSDB_OFFSET_TOPIC);
+        SYSTEM_TOPIC_SET.add(RMQ_SYS_TOPIC_CONFIG_SYNC);
+        SYSTEM_TOPIC_SET.add(RMQ_SYS_CONSUMER_OFFSET_SYNC);
+        SYSTEM_TOPIC_SET.add(RMQ_SYS_DELAY_OFFSET_SYNC);
+        SYSTEM_TOPIC_SET.add(RMQ_SYS_SUBSCRIPTION_GROUP_SYNC);
+        SYSTEM_TOPIC_SET.add(RMQ_SYS_MESSAGE_MODE_SYNC);
+        SYSTEM_TOPIC_SET.add(RMQ_SYS_TIMER_METRICS_SYNC);
+
 
         NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC);
         NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TRANS_HALF_TOPIC);
@@ -71,6 +83,13 @@
         NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);
         NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SELF_TEST_TOPIC);
         NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT);
+        NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TOPIC_CONFIG_SYNC);
+        NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_CONSUMER_OFFSET_SYNC);
+        NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_DELAY_OFFSET_SYNC);
+        NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SUBSCRIPTION_GROUP_SYNC);
+        NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_MESSAGE_MODE_SYNC);
+        NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TIMER_METRICS_SYNC);
+
 
         // regex: ^[%|a-zA-Z0-9_-]+$
         // %
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 8b2749e..6cf0b17 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -202,9 +202,9 @@
     public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
 
     public static final int ADD_WRITE_PERM_OF_BROKER = 327;
-    
+
     public static final int GET_ALL_PRODUCER_INFO = 328;
-    
+
     public static final int DELETE_EXPIRED_COMMITLOG = 329;
 
     public static final int GET_TOPIC_CONFIG = 351;
@@ -239,11 +239,14 @@
 
     public static final int RESET_MASTER_FLUSH_OFFSET = 908;
 
-    /**
-     * Controller code
-     */
-    public static final int CONTROLLER_ALTER_SYNC_STATE_SET = 1001;
+    public static final int GET_CONSUMER_OFFSET_SNAPSHOT = 614;
+    public static final int GET_SUBSCRIPTION_GROUP_SNAPSHOT = 615;
+    public static final int GET_TOPIC_CONFIG_SNAPSHOT = 616;
+    public static final int GET_DELAY_OFFSET_SNAPSHOT = 617;
+    public static final int GET_TOPIC_METRICS_SNAPSHOT = 618;
+    public static final int GET_MESSAGE_REQUEST_MODE_SNAPSHOT = 619;
 
+    public static final int CONTROLLER_ALTER_SYNC_STATE_SET = 1001;
     public static final int CONTROLLER_ELECT_MASTER = 1002;
 
     public static final int CONTROLLER_REGISTER_BROKER = 1003;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SetMessageRequestModeRequestBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SetMessageRequestModeRequestBody.java
index 31aecd0..b1991ac 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SetMessageRequestModeRequestBody.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SetMessageRequestModeRequestBody.java
@@ -20,7 +20,7 @@
 import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class SetMessageRequestModeRequestBody extends RemotingSerializable {
+public class SetMessageRequestModeRequestBody extends RemotingSerializable implements Cloneable {
 
     private String topic;
 
@@ -67,4 +67,9 @@
     public void setPopShareQueueNum(int popShareQueueNum) {
         this.popShareQueueNum = popShareQueueNum;
     }
+
+    @Override
+    public SetMessageRequestModeRequestBody clone() throws CloneNotSupportedException {
+        return (SetMessageRequestModeRequestBody)super.clone();
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
index c9c2a80..6e34bfa 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
@@ -19,12 +19,13 @@
 
 import com.google.common.base.MoreObjects;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.rocketmq.common.MixAll;
 
-public class SubscriptionGroupConfig {
+public class SubscriptionGroupConfig implements Cloneable {
 
     private String groupName;
 
@@ -222,6 +223,18 @@
     }
 
     @Override
+    public SubscriptionGroupConfig clone() throws CloneNotSupportedException {
+        SubscriptionGroupConfig clone = (SubscriptionGroupConfig) super.clone();
+        if (subscriptionDataSet != null) {
+            clone.setSubscriptionDataSet(new HashSet<>(subscriptionDataSet));
+        }
+        if (attributes != null) {
+            clone.setAttributes(new HashMap<>(attributes));
+        }
+        return clone;
+    }
+
+    @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
             .add("groupName", groupName)
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
index ba72404..fc4f5a0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
@@ -41,6 +41,7 @@
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.sync.MetadataChangeObserver;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -52,8 +53,11 @@
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private transient final Lock lock = new ReentrantLock();
 
+    private transient MetadataChangeObserver changeNotifier;
+
     private final ConcurrentMap<String, Metric> timingCount = new ConcurrentHashMap<>(1024);
 
+
     private final ConcurrentMap<Integer, Metric> timingDistribution = new ConcurrentHashMap<>(1024);
 
     @SuppressWarnings("DoubleBraceInitialization")
@@ -75,6 +79,11 @@
         this.configPath = configPath;
     }
 
+    public TimerMetrics(String configPath, MetadataChangeObserver changeNotifier) {
+        this.configPath = configPath;
+        this.changeNotifier = changeNotifier;
+    }
+
     public long updateDistPair(int period, int value) {
         Metric distPair = getDistPair(period);
         return distPair.getCount().addAndGet(value);
@@ -107,9 +116,22 @@
             return pair;
         }
         pair = new Metric();
-        final Metric previous = timingCount.putIfAbsent(topic, pair);
-        if (null != previous) {
-            return previous;
+        final Metric[] previous = new Metric[1];
+        Metric finalPair = pair;
+        timingCount.compute(topic, (key, existingValue) -> {
+            if (null != existingValue) {
+                changeNotifier.onUpdated(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, topic, finalPair);
+                previous[0] = existingValue;
+                return existingValue;
+            } else {
+                changeNotifier.onCreated(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, topic, finalPair);
+                previous[0] = null;
+                return finalPair;
+            }
+
+        });
+        if (null != previous[0]) {
+            return previous[0];
         }
         return pair;
     }
@@ -287,4 +309,20 @@
         }
     }
 
+    public ConcurrentMap<String, TimerMetrics.Metric> deepCopyTimingCountSnapshot() {
+        ConcurrentMap<String, TimerMetrics.Metric> snapshot = new ConcurrentHashMap<>(this.timingCount.size());
+        for (Map.Entry<String, TimerMetrics.Metric> entry : this.timingCount.entrySet()) {
+            String topic = entry.getKey();
+            TimerMetrics.Metric originalMetric = entry.getValue();
+
+            if (originalMetric != null) {
+                TimerMetrics.Metric clonedMetric = new TimerMetrics.Metric();
+                clonedMetric.setCount(new AtomicLong(originalMetric.getCount().get()));
+                clonedMetric.setTimeStamp(originalMetric.getTimeStamp());
+                snapshot.put(topic, clonedMetric);
+            }
+        }
+
+        return snapshot;
+    }
 }
diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index a014e77..9586a08 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -43,6 +43,7 @@
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -122,9 +123,9 @@
         if (null == rootDir) {
             rootDir = StoreTestUtils.createBaseDir();
         }
-
+        NoopMetadataChangeObserver noopMetadataChangeObserver = new NoopMetadataChangeObserver();
         TimerCheckpoint timerCheckpoint = new TimerCheckpoint(rootDir + File.separator + "config" + File.separator + "timercheck");
-        TimerMetrics timerMetrics = new TimerMetrics(rootDir + File.separator + "config" + File.separator + "timermetrics");
+        TimerMetrics timerMetrics = new TimerMetrics(rootDir + File.separator + "config" + File.separator + "timermetrics", noopMetadataChangeObserver);
         MessageStore ms = needMock ? mockMessageStore : messageStore;
         TimerMessageStore timerMessageStore = new TimerMessageStore(ms, storeConfig, timerCheckpoint, timerMetrics, null);
         ms.setTimerMessageStore(timerMessageStore);
diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
index f664e67..489498e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
@@ -19,20 +19,25 @@
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.sync.MetadataChangeObserver;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.ArrayList;
 import java.util.List;
-
+@ExtendWith(MockitoExtension.class)
 public class TimerMetricsTest {
 
+    private final MetadataChangeObserver syncMetadataChangeObserver = new NoopMetadataChangeObserver();
 
     @Test
     public void testTimingCount() {
         String baseDir = StoreTestUtils.createBaseDir();
 
-        TimerMetrics first = new TimerMetrics(baseDir);
+        TimerMetrics first = new TimerMetrics(baseDir,syncMetadataChangeObserver);
         Assert.assertTrue(first.load());
         MessageExt msg = new MessageExt();
         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, "AAA");
@@ -46,7 +51,7 @@
         Assert.assertTrue(first.getTopicPair("AAA").getTimeStamp() <= curr);
         first.persist();
 
-        TimerMetrics second = new TimerMetrics(baseDir);
+        TimerMetrics second = new TimerMetrics(baseDir,syncMetadataChangeObserver);
         Assert.assertTrue(second.load());
         Assert.assertEquals(1000, second.getTimingCount("AAA"));
         Assert.assertEquals(2000, second.getTimingCount("BBB"));