[RIP-79] Route Change Notification (#9549)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7b1701c..328cf77 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -80,6 +80,8 @@
 import org.apache.rocketmq.broker.processor.RecallMessageProcessor;
 import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
 import org.apache.rocketmq.broker.processor.SendMessageProcessor;
+import org.apache.rocketmq.broker.route.RouteEventService;
+import org.apache.rocketmq.broker.route.RouteEventType;
 import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
@@ -300,6 +302,7 @@
     private TransactionMetricsFlushService transactionMetricsFlushService;
     private AuthenticationMetadataManager authenticationMetadataManager;
     private AuthorizationMetadataManager authorizationMetadataManager;
+    protected RouteEventService routeEventService;
 
     private ConfigContext configContext;
 
@@ -470,6 +473,8 @@
         if (this.authConfig != null && this.authConfig.isMigrateAuthFromV1Enabled()) {
             new AuthMigrator(this.authConfig).migrate();
         }
+        
+        this.routeEventService = new RouteEventService(this);
     }
 
     public AuthConfig getAuthConfig() {
@@ -1317,6 +1322,10 @@
         this.messageStore = messageStore;
     }
 
+    public RouteEventService getRouteEventService() {
+        return routeEventService;
+    }
+
     protected void printMasterAndSlaveDiff() {
         if (messageStore.getHaService() != null && messageStore.getHaService().getConnectionCount().get() > 0) {
             long diff = this.messageStore.slaveFallBehindMuch();
@@ -1415,6 +1424,10 @@
 
         this.unregisterBrokerAll();
 
+        if (this.routeEventService != null) {
+            this.routeEventService.publishEvent(RouteEventType.SHUTDOWN);
+        }
+
         if (this.shutdownHook != null) {
             this.shutdownHook.beforeShutdown(this);
         }
@@ -1812,6 +1825,10 @@
         if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
             changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
             this.registerBrokerAll(true, false, true);
+            if (this.routeEventService != null) {
+                this.routeEventService.publishEvent(RouteEventType.START);
+            }
+
         }
 
         scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@@ -1866,6 +1883,7 @@
                 }
             }
         }, 10, 5, TimeUnit.SECONDS);
+
     }
 
     protected void scheduleSendHeartbeat() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventConstants.java b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventConstants.java
new file mode 100644
index 0000000..ac956c6
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventConstants.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.route;
+
+public class RouteEventConstants {
+    public static final String EVENT_TYPE = "eventType";
+    public static final String BROKER_NAME = "brokerName";
+    public static final String BROKER_ID = "brokerId";
+    public static final String TIMESTAMP = "timestamp";
+    public static final String AFFECTED_TOPICS = "affectedTopics";
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventService.java b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventService.java
new file mode 100644
index 0000000..1dbc81a
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventService.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.route;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.PutMessageResult;
+
+import com.alibaba.fastjson2.JSON;
+
+public class RouteEventService {
+    private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final BrokerController brokerController;
+    private static final int MAX_TOPICS_PER_EVENT = 5000;
+
+    public RouteEventService(BrokerController brokerController) {
+        this.brokerController = brokerController;
+        LOG.info("RouteEventService initialized for broker: {}",
+            brokerController.getBrokerConfig().getBrokerName());
+    }
+
+    public void publishEvent(RouteEventType eventType) {
+        if (!brokerController.getBrokerConfig().isEnableRouteChangeNotification()) {
+            return;
+        }
+
+        if (brokerController.getTopicConfigManager() == null) {
+            return;
+        }
+
+        Set<String> topics = brokerController.getTopicConfigManager().getTopicConfigTable().keySet();
+        publishEventInternal(eventType, topics);
+    }
+
+    public void publishEvent(RouteEventType eventType, String topicName) {
+        if (!brokerController.getBrokerConfig().isEnableRouteChangeNotification()) {
+            return;
+        }
+
+        if (topicName == null) {
+            return;
+        }
+
+        publishEventInternal(eventType, Collections.singleton(topicName));
+    }
+
+    private void publishEventInternal(RouteEventType eventType, Set<String> topics) {
+        try {
+            if (topics == null || topics.isEmpty()) {
+                sendEvent(eventType, null);
+                return;
+            }
+
+            List<String> topicList = new ArrayList<>(topics);
+            partitionTopics(topicList, MAX_TOPICS_PER_EVENT)
+                .forEach(batch -> sendEvent(eventType, batch));
+
+            LOG.info("[{}]: published event for {} topics", eventType, topics.size());
+        } catch (Exception e) {
+            LOG.error("Failed to publish {} event for topics: {}", eventType, topics, e);
+        }
+    }
+
+    private void sendEvent(RouteEventType eventType, List<String> topics) {
+        Map<String, Object> eventData = createEventData(eventType, topics);
+        MessageExtBrokerInner msg = createEventMessage(eventData);
+
+        if (eventType == RouteEventType.TOPIC_CHANGE) {
+            try {
+                PutMessageResult putResult = brokerController.getMessageStore().putMessage(msg);
+                brokerController.getMessageStore().flush();
+                if (!putResult.isOk()) {
+                    LOG.warn("[ROUTE_EVENT] Publish failed: {}", putResult.getPutMessageStatus());
+                }
+            } catch (Exception e) {
+                LOG.error("[TOPIC_CHANGE_EVENT] Failed to store event locally.", e);
+            }
+            return;
+        }
+
+        if (eventType == RouteEventType.START || eventType == RouteEventType.SHUTDOWN) {
+            TopicPublishInfo routeInfo = brokerController.getTopicRouteInfoManager()
+                .tryToFindTopicPublishInfo(TopicValidator.RMQ_ROUTE_EVENT_TOPIC);
+            String currentBrokerName = brokerController.getBrokerConfig().getBrokerName();
+
+            for (MessageQueue mq : routeInfo.getMessageQueueList()) {
+                String targetBrokerName = mq.getBrokerName();
+                
+                if (targetBrokerName.equals(currentBrokerName)) {
+                    continue;
+                }
+
+                try {
+                    SendResult sendResult = brokerController.getEscapeBridge()
+                        .putMessageToRemoteBroker(msg, targetBrokerName);
+
+                    if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                        return;
+                    }
+                } catch (Exception e) {
+                    LOG.warn("[BROKER_EVENT] Exception occurred when sending {} event to broker: {}",
+                            eventType, targetBrokerName, e);
+                }
+            }
+            LOG.error("[BROKER_EVENT] Failed to send {} event to any remote broker.", eventType);
+        }
+    }
+
+    private Map<String, Object> createEventData(RouteEventType eventType, List<String> topics) {
+        Map<String, Object> eventData = new HashMap<>();
+        eventData.put(RouteEventConstants.EVENT_TYPE, eventType.name());
+        eventData.put(RouteEventConstants.BROKER_NAME, brokerController.getBrokerConfig().getBrokerName());
+        eventData.put(RouteEventConstants.BROKER_ID, brokerController.getBrokerConfig().getBrokerId());
+        eventData.put(RouteEventConstants.TIMESTAMP, System.currentTimeMillis());
+
+        if (topics != null && !topics.isEmpty()) {
+            eventData.put(RouteEventConstants.AFFECTED_TOPICS, topics);
+        }
+
+        return eventData;
+    }
+
+    private List<List<String>> partitionTopics(List<String> topics, int batchSize) {
+        List<List<String>> batches = new ArrayList<>();
+
+        for (int i = 0; i < topics.size(); i += batchSize) {
+            int end = Math.min(i + batchSize, topics.size());
+            batches.add(topics.subList(i, end));
+        }
+
+        return batches;
+    }
+    private MessageExtBrokerInner createEventMessage(Map<String, Object> eventData) {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic(TopicValidator.RMQ_ROUTE_EVENT_TOPIC);
+        msg.setBody(JSON.toJSONString(eventData).getBytes(StandardCharsets.UTF_8));
+        msg.setTags(eventData.get(RouteEventConstants.EVENT_TYPE).toString());
+        msg.setQueueId(0);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setBornHost(brokerController.getStoreHost());
+        msg.setStoreHost(brokerController.getStoreHost());
+        msg.setSysFlag(0);
+
+        return msg;
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventType.java b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventType.java
new file mode 100644
index 0000000..0c76f70
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.route;
+
+public enum RouteEventType {
+    START,
+    SHUTDOWN,
+    TOPIC_CHANGE
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index f59651f..765f283 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -35,6 +35,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.broker.route.RouteEventType;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PopAckConstants;
@@ -224,6 +225,16 @@
                 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
             }
         }
+
+        {
+            // TopicValidator.RMQ_ROUTE_EVENT_TOPIC
+            String topic = TopicValidator.RMQ_ROUTE_EVENT_TOPIC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            TopicValidator.addSystemTopic(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            putTopicConfig(topicConfig);
+        }
     }
 
     public TopicConfig putTopicConfig(TopicConfig topicConfig) {
@@ -747,6 +758,13 @@
         } else {
             this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
         }
+        if (this.brokerController.getBrokerConfig().isEnableRouteChangeNotification()) {
+            this.brokerController.getRouteEventService().publishEvent(
+                RouteEventType.TOPIC_CHANGE,
+                topicConfig.getTopicName()
+            );
+        }
+
     }
 
     public boolean containsTopic(String topic) {
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/route/RouteEventServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/route/RouteEventServiceTest.java
new file mode 100644
index 0000000..81f46b2
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/route/RouteEventServiceTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.route;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.alibaba.fastjson2.JSON;
+
+public class RouteEventServiceTest {
+    private BrokerController brokerController;
+    private MessageStore mockMessageStore;
+    private RouteEventService routeEventService;
+
+    @Before
+    public void setUp() {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setEnableRouteChangeNotification(true);
+
+        brokerController = new BrokerController(
+            brokerConfig,
+            new NettyServerConfig(),
+            new NettyClientConfig(),
+            new MessageStoreConfig()
+        );
+
+        mockMessageStore = mock(MessageStore.class);
+        brokerController.setMessageStore(mockMessageStore);
+
+        routeEventService = new RouteEventService(brokerController);
+    }
+
+    @Test
+    public void testPublishEventSuccessfully() {
+        when(mockMessageStore.putMessage(any())).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, null));
+
+        routeEventService.publishEvent(RouteEventType.TOPIC_CHANGE, "TestTopic");
+
+        verify(mockMessageStore).putMessage(any(MessageExtBrokerInner.class));
+    }
+
+    @Test
+    public void testIncludeTopicInEvent() {
+        when(mockMessageStore.putMessage(any())).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, null));
+
+        routeEventService.publishEvent(RouteEventType.TOPIC_CHANGE, "TestTopic");
+
+        ArgumentCaptor<MessageExtBrokerInner> captor = ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+        verify(mockMessageStore).putMessage(captor.capture());
+
+        Map<String, Object> eventData = JSON.parseObject(
+            new String(captor.getValue().getBody()),
+            Map.class
+        );
+
+        List<String> affectedTopics = (List<String>) eventData.get(RouteEventConstants.AFFECTED_TOPICS);
+
+        List<String> expectedTopics = Collections.singletonList("TestTopic");
+
+        assertEquals(expectedTopics, affectedTopics);
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a464355..8117fc0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -141,6 +141,8 @@
 
     private boolean slaveReadEnable = false;
 
+    private boolean enableRouteChangeNotification = false;
+
     private boolean disableConsumeIfConsumerReadSlowly = false;
     private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
 
@@ -749,6 +751,14 @@
         this.slaveReadEnable = slaveReadEnable;
     }
 
+    public boolean isEnableRouteChangeNotification() {
+        return enableRouteChangeNotification;
+    }
+
+    public void setEnableRouteChangeNotification(final boolean enableRouteChangeNotification) {
+        this.enableRouteChangeNotification = enableRouteChangeNotification;
+    }
+
     public int getRegisterBrokerTimeoutMills() {
         return registerBrokerTimeoutMills;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
index 47d45c6..fe5b7d3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
@@ -33,6 +33,7 @@
     public static final String RMQ_SYS_SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
     public static final String RMQ_SYS_OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
     public static final String RMQ_SYS_ROCKSDB_OFFSET_TOPIC = "CHECKPOINT_TOPIC";
+    public static final String RMQ_ROUTE_EVENT_TOPIC = "ROUTE_EVENT_TOPIC";
 
     public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
     public static final String SYNC_BROKER_MEMBER_GROUP_PREFIX = SYSTEM_TOPIC_PREFIX + "SYNC_BROKER_MEMBER_";
@@ -64,6 +65,7 @@
         SYSTEM_TOPIC_SET.add(RMQ_SYS_SELF_TEST_TOPIC);
         SYSTEM_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT);
         SYSTEM_TOPIC_SET.add(RMQ_SYS_ROCKSDB_OFFSET_TOPIC);
+        SYSTEM_TOPIC_SET.add(RMQ_ROUTE_EVENT_TOPIC);
 
         NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC);
         NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TRANS_HALF_TOPIC);
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index 41ce282..5c1d368 100644
--- a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -21,6 +21,7 @@
 import org.apache.rocketmq.auth.config.AuthConfig;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.route.RouteEventType;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
@@ -80,6 +81,10 @@
         if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
             changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
             this.registerBrokerAll(true, false, true);
+            if (this.routeEventService != null) {
+                this.routeEventService.publishEvent(RouteEventType.START);
+            }
+
         }
 
         scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index a99b0af..5e15caf 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -207,6 +207,8 @@
 
     private boolean enableAclRpcHookForClusterMode = false;
 
+    private boolean enableRouteChangeNotification = false;
+
     private boolean useDelayLevel = false;
     private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
     private transient ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentSkipListMap<>();
@@ -1062,6 +1064,14 @@
         this.enableAclRpcHookForClusterMode = enableAclRpcHookForClusterMode;
     }
 
+    public boolean isEnableRouteChangeNotification() {
+        return enableRouteChangeNotification;
+    }
+
+    public void setEnableRouteChangeNotification(final boolean enableRouteChangeNotification) {
+        this.enableRouteChangeNotification = enableRouteChangeNotification;
+    }
+
     public boolean isEnableTopicMessageTypeCheck() {
         return enableTopicMessageTypeCheck;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifier.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifier.java
new file mode 100644
index 0000000..23fa229
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifier.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.service.route;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.broker.route.RouteEventConstants;
+import org.apache.rocketmq.broker.route.RouteEventType;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.alibaba.fastjson2.JSON;
+
+public class RouteChangeNotifier {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+    
+    private final LoadingCache<String, MessageQueueView> topicCache;
+    private final ThreadPoolExecutor routeRefreshExecutor;
+    
+    private final ConcurrentMap<String, Long> dirtyTopics = new ConcurrentHashMap<>();
+    private final Queue<String> pendingTopics = new ConcurrentLinkedQueue<>();
+    private final ScheduledExecutorService scheduler;
+    
+    private final DefaultMQPushConsumer consumer;
+    private final BiConsumer<String, Long> dirtyMarker;
+    
+    public RouteChangeNotifier(LoadingCache<String, MessageQueueView> topicCache) {
+        this.topicCache = topicCache;
+
+        this.scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
+            new ThreadFactoryImpl("RouteUpdaterScheduler_")
+        );
+
+        this.routeRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
+            2,
+            4,
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            "TopicRouteCacheRefresh",
+            1000
+        );
+
+        this.consumer = new DefaultMQPushConsumer("PROXY_ROUTE_EVENT_GROUP");
+        this.consumer.setMessageModel(MessageModel.BROADCASTING);
+        this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+
+        this.dirtyMarker = (topic, timeStamp) -> {
+            markCacheDirty(topic, timeStamp);
+        };
+    }
+
+    public void start() {
+        startEventSubscription();
+
+        scheduler.scheduleWithFixedDelay(this::processDirtyTopics, 50, 200, TimeUnit.MILLISECONDS);
+    }
+
+    public void shutdown() {
+        if (consumer != null) {
+            consumer.shutdown();
+        }
+
+        if (scheduler != null) {
+            scheduler.shutdown();
+        }
+
+        if (routeRefreshExecutor != null) {
+            routeRefreshExecutor.shutdown();
+        }
+    }
+    
+    private void startEventSubscription() {
+        try {
+            consumer.subscribe(TopicValidator.RMQ_ROUTE_EVENT_TOPIC, "*");
+            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+                processEventMessages(msgs);
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            });
+
+            consumer.start();
+        } catch (MQClientException e) {
+            log.error("Failed to start route event consumer", e);
+        }
+    }
+
+    private void processEventMessages(List<MessageExt> msgs) {
+        for (MessageExt msg : msgs) {
+            try {
+                String json = new String(msg.getBody(), StandardCharsets.UTF_8);
+                Map<String, Object> event = JSON.parseObject(json, Map.class);
+
+                log.info("[ROUTE_EVENT]: accept event {}", event);
+                RouteEventType eventType = RouteEventType.valueOf((String) event.get(RouteEventConstants.EVENT_TYPE));
+                Long eventTimeStamp = (Long) event.get(RouteEventConstants.TIMESTAMP);
+
+                switch (eventType) {
+                    case START:
+                    case SHUTDOWN:
+                    case TOPIC_CHANGE:
+                        List<String> affectedTopics = (List<String>) event.get(RouteEventConstants.AFFECTED_TOPICS);
+
+                        if (affectedTopics != null) {
+                            for (String topic : affectedTopics) {
+                                dirtyMarker.accept(topic, eventTimeStamp);
+                            }
+                        } else {
+                            log.info("[ROUTE_UPDATE] No affected topic specified in event: {}", event);
+                        }
+                        break;
+
+                    default:
+                        break;
+                }
+                
+            } catch (Exception e) {
+                log.error("[ROUTE_UPDATE]: Error processing route event", e);
+            }
+        }
+    }
+
+    public void markCacheDirty(String topic, long timeStamp) {
+        long currentTime = System.currentTimeMillis();
+        if (currentTime - timeStamp > TimeUnit.MINUTES.toMillis(1)) {
+            return;
+        }
+
+        dirtyTopics.put(topic, currentTime);
+        pendingTopics.offer(topic);
+    }
+
+    public void markCompleted(String topic) {
+        dirtyTopics.remove(topic);
+    }
+
+    private void processDirtyTopics() {
+        List<String> batch = new ArrayList<>();
+        while (!pendingTopics.isEmpty() && batch.size() < 100) {
+            String topic = pendingTopics.poll();
+            if (topic == null) break;
+
+            Long timestamp = dirtyTopics.get(topic);
+            if (timestamp == null) continue;
+
+            if (System.currentTimeMillis() - timestamp > TimeUnit.MINUTES.toMillis(1)) {
+                markCompleted(topic);
+                log.error("refreshing topic: {} stop for delay", topic);
+                continue;
+            }
+
+            batch.add(topic);
+        }
+
+        for (String topic : batch) {
+            routeRefreshExecutor.execute(() -> {
+                refreshSingleRoute(topic);
+                markCompleted(topic);
+            });
+        }
+    }
+
+    private void refreshSingleRoute(String topic) {
+        try {
+            if (topicCache.getIfPresent(topic) == null) {
+                return;
+            }
+
+            topicCache.refresh(topic);
+
+        } catch (Exception e) {
+            log.error("Refresh failed for: {}", topic, e);
+        }
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index bcdf814..7b5ad1e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -26,6 +26,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -56,6 +57,8 @@
     private final MQClientAPIFactory mqClientAPIFactory;
     private MQFaultStrategy mqFaultStrategy;
 
+    private RouteChangeNotifier routeChangeNotifier;
+
     protected final LoadingCache<String /* topicName */, MessageQueueView> topicCache;
     protected final ScheduledExecutorService scheduledExecutorService;
     protected final ThreadPoolExecutor cacheRefreshExecutor;
@@ -85,6 +88,7 @@
                 public @Nullable MessageQueueView load(String topic) throws Exception {
                     try {
                         TopicRouteData topicRouteData = mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis());
+
                         return buildMessageQueueView(topic, topicRouteData);
                     } catch (Exception e) {
                         if (TopicRouteHelper.isTopicNotExistError(e)) {
@@ -98,6 +102,11 @@
                 public @Nullable MessageQueueView reload(@NonNull String key,
                     @NonNull MessageQueueView oldValue) throws Exception {
                     try {
+                        if (routeChangeNotifier != null
+                            && ConfigurationManager.getProxyConfig().isEnableRouteChangeNotification()) {
+                            routeChangeNotifier.markCompleted(key);
+                        }
+
                         return load(key);
                     } catch (Exception e) {
                         log.warn(String.format("reload topic route from namesrv. topic: %s", key), e);
@@ -134,6 +143,9 @@
                 }
             }
         }, serviceDetector);
+
+        this.routeChangeNotifier = new RouteChangeNotifier(this.topicCache);
+
         this.init();
     }
 
@@ -155,6 +167,10 @@
         if (this.mqFaultStrategy.isStartDetectorEnable()) {
             mqFaultStrategy.shutdown();
         }
+
+        if (ConfigurationManager.getProxyConfig().isEnableRouteChangeNotification()) {
+            this.routeChangeNotifier.shutdown();
+        }
     }
 
     @Override
@@ -162,6 +178,10 @@
         if (this.mqFaultStrategy.isStartDetectorEnable()) {
             this.mqFaultStrategy.startDetector();
         }
+
+        if (ConfigurationManager.getProxyConfig().isEnableRouteChangeNotification()) {
+            this.routeChangeNotifier.start();
+        }
     }
 
     public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) {
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifierTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifierTest.java
new file mode 100644
index 0000000..b46c46f
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifierTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.fastjson2.JSON;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+
+public class RouteChangeNotifierTest {
+    private LoadingCache<String, MessageQueueView> mockCache;
+    private ThreadPoolExecutor mockExecutor;
+    private RouteChangeNotifier notifier;
+
+    @Before
+    public void setUp() {
+        mockCache = mock(LoadingCache.class);
+        mockExecutor = mock(ThreadPoolExecutor.class);
+        notifier = new RouteChangeNotifier(mockCache);
+    }
+
+    @Test
+    public void testHandleShutdownEvent() throws Exception {
+        Map<String, Object> eventData = new HashMap<>();
+        eventData.put("eventType", "SHUTDOWN");
+        eventData.put("brokerName", "TestBroker");
+        eventData.put("timestamp", System.currentTimeMillis());
+        eventData.put("affectedTopics", Arrays.asList("TopicA", "TopicB"));
+
+        MessageExt msg = new MessageExt();
+        msg.setBody(JSON.toJSONString(eventData).getBytes());
+
+        invokePrivateMethod(notifier, "processEventMessages", Collections.singletonList(msg));
+
+        verifyMarkCacheDirtyCalled("TopicA");
+        verifyMarkCacheDirtyCalled("TopicB");
+    }
+
+    @Test
+    public void testHandleTopicChangeEvent() throws Exception {
+        Map<String, Object> eventData = new HashMap<>();
+        eventData.put("eventType", "TOPIC_CHANGE");
+        eventData.put("brokerName", "TestBroker");
+        eventData.put("affectedTopics", Collections.singletonList("TestTopic"));
+        eventData.put("timestamp", System.currentTimeMillis());
+
+        MessageExt msg = new MessageExt();
+        msg.setBody(JSON.toJSONString(eventData).getBytes());
+
+        invokePrivateMethod(notifier, "processEventMessages", Collections.singletonList(msg));
+
+        verifyMarkCacheDirtyCalled("TestTopic");
+    }
+
+    @Test
+    public void testMarkCacheDirtyAddsToPendingTopics() throws Exception {
+        notifier.markCacheDirty("TestTopic", System.currentTimeMillis());
+
+        Field pendingTopicsField = RouteChangeNotifier.class.getDeclaredField("pendingTopics");
+        pendingTopicsField.setAccessible(true);
+        Queue<String> pendingTopics = (Queue<String>) pendingTopicsField.get(notifier);
+
+        assertTrue(pendingTopics.contains("TestTopic"));
+    }
+
+    @Test
+    public void testRefreshTriggersCacheRefresh() throws Exception {
+        notifier.markCacheDirty("TestTopic", System.currentTimeMillis());
+
+        when(mockCache.getIfPresent("TestTopic")).thenReturn(mock(MessageQueueView.class));
+
+        invokePrivateMethod(notifier, "refreshSingleRoute", "TestTopic");
+
+        verify(mockCache).refresh("TestTopic");
+    }
+
+    @Test
+    public void testRefreshFailureAddsToRetryQueue() throws Exception {
+        notifier.markCacheDirty("TestTopic", System.currentTimeMillis());
+
+        when(mockCache.getIfPresent("TestTopic")).thenReturn(mock(MessageQueueView.class));
+
+        doThrow(new RuntimeException("Refresh error")).when(mockCache).refresh("TestTopic");
+
+        invokePrivateMethod(notifier, "refreshSingleRoute", "TestTopic");
+
+        Field pendingTopicsField = RouteChangeNotifier.class.getDeclaredField("pendingTopics");
+        pendingTopicsField.setAccessible(true);
+        Queue<String> pendingTopics = (Queue<String>) pendingTopicsField.get(notifier);
+
+        assertTrue(pendingTopics.contains("TestTopic"));
+    }
+
+    private void invokePrivateMethod(Object obj, String methodName, Object arg) throws Exception {
+        java.lang.reflect.Method method;
+        
+        if (arg instanceof List) {
+            method = obj.getClass().getDeclaredMethod(methodName, List.class);
+            method.setAccessible(true);
+            method.invoke(obj, arg);
+        } else if (arg instanceof String) {
+            method = obj.getClass().getDeclaredMethod(methodName, String.class);
+            method.setAccessible(true);
+            method.invoke(obj, arg);
+        }
+    }
+
+    private void verifyMarkCacheDirtyCalled(String topic) throws Exception {
+        Field pendingTopicsField = RouteChangeNotifier.class.getDeclaredField("pendingTopics");
+        pendingTopicsField.setAccessible(true);
+        Queue<String> pendingTopics = (Queue<String>) pendingTopicsField.get(notifier);
+        
+        assertTrue("Topic " + topic + " should be in pending topics", 
+                  pendingTopics.contains(topic));
+    }
+}