[RIP-79] Route Change Notification (#9549)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7b1701c..328cf77 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -80,6 +80,8 @@
import org.apache.rocketmq.broker.processor.RecallMessageProcessor;
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
+import org.apache.rocketmq.broker.route.RouteEventService;
+import org.apache.rocketmq.broker.route.RouteEventType;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
@@ -300,6 +302,7 @@
private TransactionMetricsFlushService transactionMetricsFlushService;
private AuthenticationMetadataManager authenticationMetadataManager;
private AuthorizationMetadataManager authorizationMetadataManager;
+ protected RouteEventService routeEventService;
private ConfigContext configContext;
@@ -470,6 +473,8 @@
if (this.authConfig != null && this.authConfig.isMigrateAuthFromV1Enabled()) {
new AuthMigrator(this.authConfig).migrate();
}
+
+ this.routeEventService = new RouteEventService(this);
}
public AuthConfig getAuthConfig() {
@@ -1317,6 +1322,10 @@
this.messageStore = messageStore;
}
+ public RouteEventService getRouteEventService() {
+ return routeEventService;
+ }
+
protected void printMasterAndSlaveDiff() {
if (messageStore.getHaService() != null && messageStore.getHaService().getConnectionCount().get() > 0) {
long diff = this.messageStore.slaveFallBehindMuch();
@@ -1415,6 +1424,10 @@
this.unregisterBrokerAll();
+ if (this.routeEventService != null) {
+ this.routeEventService.publishEvent(RouteEventType.SHUTDOWN);
+ }
+
if (this.shutdownHook != null) {
this.shutdownHook.beforeShutdown(this);
}
@@ -1812,6 +1825,10 @@
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
+ if (this.routeEventService != null) {
+ this.routeEventService.publishEvent(RouteEventType.START);
+ }
+
}
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@@ -1866,6 +1883,7 @@
}
}
}, 10, 5, TimeUnit.SECONDS);
+
}
protected void scheduleSendHeartbeat() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventConstants.java b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventConstants.java
new file mode 100644
index 0000000..ac956c6
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventConstants.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.route;
+
+public class RouteEventConstants {
+ public static final String EVENT_TYPE = "eventType";
+ public static final String BROKER_NAME = "brokerName";
+ public static final String BROKER_ID = "brokerId";
+ public static final String TIMESTAMP = "timestamp";
+ public static final String AFFECTED_TOPICS = "affectedTopics";
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventService.java b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventService.java
new file mode 100644
index 0000000..1dbc81a
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventService.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.route;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.PutMessageResult;
+
+import com.alibaba.fastjson2.JSON;
+
+public class RouteEventService {
+ private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+ private static final int MAX_TOPICS_PER_EVENT = 5000;
+
+ public RouteEventService(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ LOG.info("RouteEventService initialized for broker: {}",
+ brokerController.getBrokerConfig().getBrokerName());
+ }
+
+ public void publishEvent(RouteEventType eventType) {
+ if (!brokerController.getBrokerConfig().isEnableRouteChangeNotification()) {
+ return;
+ }
+
+ if (brokerController.getTopicConfigManager() == null) {
+ return;
+ }
+
+ Set<String> topics = brokerController.getTopicConfigManager().getTopicConfigTable().keySet();
+ publishEventInternal(eventType, topics);
+ }
+
+ public void publishEvent(RouteEventType eventType, String topicName) {
+ if (!brokerController.getBrokerConfig().isEnableRouteChangeNotification()) {
+ return;
+ }
+
+ if (topicName == null) {
+ return;
+ }
+
+ publishEventInternal(eventType, Collections.singleton(topicName));
+ }
+
+ private void publishEventInternal(RouteEventType eventType, Set<String> topics) {
+ try {
+ if (topics == null || topics.isEmpty()) {
+ sendEvent(eventType, null);
+ return;
+ }
+
+ List<String> topicList = new ArrayList<>(topics);
+ partitionTopics(topicList, MAX_TOPICS_PER_EVENT)
+ .forEach(batch -> sendEvent(eventType, batch));
+
+ LOG.info("[{}]: published event for {} topics", eventType, topics.size());
+ } catch (Exception e) {
+ LOG.error("Failed to publish {} event for topics: {}", eventType, topics, e);
+ }
+ }
+
+ private void sendEvent(RouteEventType eventType, List<String> topics) {
+ Map<String, Object> eventData = createEventData(eventType, topics);
+ MessageExtBrokerInner msg = createEventMessage(eventData);
+
+ if (eventType == RouteEventType.TOPIC_CHANGE) {
+ try {
+ PutMessageResult putResult = brokerController.getMessageStore().putMessage(msg);
+ brokerController.getMessageStore().flush();
+ if (!putResult.isOk()) {
+ LOG.warn("[ROUTE_EVENT] Publish failed: {}", putResult.getPutMessageStatus());
+ }
+ } catch (Exception e) {
+ LOG.error("[TOPIC_CHANGE_EVENT] Failed to store event locally.", e);
+ }
+ return;
+ }
+
+ if (eventType == RouteEventType.START || eventType == RouteEventType.SHUTDOWN) {
+ TopicPublishInfo routeInfo = brokerController.getTopicRouteInfoManager()
+ .tryToFindTopicPublishInfo(TopicValidator.RMQ_ROUTE_EVENT_TOPIC);
+ String currentBrokerName = brokerController.getBrokerConfig().getBrokerName();
+
+ for (MessageQueue mq : routeInfo.getMessageQueueList()) {
+ String targetBrokerName = mq.getBrokerName();
+
+ if (targetBrokerName.equals(currentBrokerName)) {
+ continue;
+ }
+
+ try {
+ SendResult sendResult = brokerController.getEscapeBridge()
+ .putMessageToRemoteBroker(msg, targetBrokerName);
+
+ if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ return;
+ }
+ } catch (Exception e) {
+ LOG.warn("[BROKER_EVENT] Exception occurred when sending {} event to broker: {}",
+ eventType, targetBrokerName, e);
+ }
+ }
+ LOG.error("[BROKER_EVENT] Failed to send {} event to any remote broker.", eventType);
+ }
+ }
+
+ private Map<String, Object> createEventData(RouteEventType eventType, List<String> topics) {
+ Map<String, Object> eventData = new HashMap<>();
+ eventData.put(RouteEventConstants.EVENT_TYPE, eventType.name());
+ eventData.put(RouteEventConstants.BROKER_NAME, brokerController.getBrokerConfig().getBrokerName());
+ eventData.put(RouteEventConstants.BROKER_ID, brokerController.getBrokerConfig().getBrokerId());
+ eventData.put(RouteEventConstants.TIMESTAMP, System.currentTimeMillis());
+
+ if (topics != null && !topics.isEmpty()) {
+ eventData.put(RouteEventConstants.AFFECTED_TOPICS, topics);
+ }
+
+ return eventData;
+ }
+
+ private List<List<String>> partitionTopics(List<String> topics, int batchSize) {
+ List<List<String>> batches = new ArrayList<>();
+
+ for (int i = 0; i < topics.size(); i += batchSize) {
+ int end = Math.min(i + batchSize, topics.size());
+ batches.add(topics.subList(i, end));
+ }
+
+ return batches;
+ }
+ private MessageExtBrokerInner createEventMessage(Map<String, Object> eventData) {
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setTopic(TopicValidator.RMQ_ROUTE_EVENT_TOPIC);
+ msg.setBody(JSON.toJSONString(eventData).getBytes(StandardCharsets.UTF_8));
+ msg.setTags(eventData.get(RouteEventConstants.EVENT_TYPE).toString());
+ msg.setQueueId(0);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setBornHost(brokerController.getStoreHost());
+ msg.setStoreHost(brokerController.getStoreHost());
+ msg.setSysFlag(0);
+
+ return msg;
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventType.java b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventType.java
new file mode 100644
index 0000000..0c76f70
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.route;
+
+public enum RouteEventType {
+ START,
+ SHUTDOWN,
+ TOPIC_CHANGE
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index f59651f..765f283 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -35,6 +35,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.broker.route.RouteEventType;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PopAckConstants;
@@ -224,6 +225,16 @@
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
+
+ {
+ // TopicValidator.RMQ_ROUTE_EVENT_TOPIC
+ String topic = TopicValidator.RMQ_ROUTE_EVENT_TOPIC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ TopicValidator.addSystemTopic(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ putTopicConfig(topicConfig);
+ }
}
public TopicConfig putTopicConfig(TopicConfig topicConfig) {
@@ -747,6 +758,13 @@
} else {
this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
}
+ if (this.brokerController.getBrokerConfig().isEnableRouteChangeNotification()) {
+ this.brokerController.getRouteEventService().publishEvent(
+ RouteEventType.TOPIC_CHANGE,
+ topicConfig.getTopicName()
+ );
+ }
+
}
public boolean containsTopic(String topic) {
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/route/RouteEventServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/route/RouteEventServiceTest.java
new file mode 100644
index 0000000..81f46b2
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/route/RouteEventServiceTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.route;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.alibaba.fastjson2.JSON;
+
+public class RouteEventServiceTest {
+ private BrokerController brokerController;
+ private MessageStore mockMessageStore;
+ private RouteEventService routeEventService;
+
+ @Before
+ public void setUp() {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setEnableRouteChangeNotification(true);
+
+ brokerController = new BrokerController(
+ brokerConfig,
+ new NettyServerConfig(),
+ new NettyClientConfig(),
+ new MessageStoreConfig()
+ );
+
+ mockMessageStore = mock(MessageStore.class);
+ brokerController.setMessageStore(mockMessageStore);
+
+ routeEventService = new RouteEventService(brokerController);
+ }
+
+ @Test
+ public void testPublishEventSuccessfully() {
+ when(mockMessageStore.putMessage(any())).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, null));
+
+ routeEventService.publishEvent(RouteEventType.TOPIC_CHANGE, "TestTopic");
+
+ verify(mockMessageStore).putMessage(any(MessageExtBrokerInner.class));
+ }
+
+ @Test
+ public void testIncludeTopicInEvent() {
+ when(mockMessageStore.putMessage(any())).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, null));
+
+ routeEventService.publishEvent(RouteEventType.TOPIC_CHANGE, "TestTopic");
+
+ ArgumentCaptor<MessageExtBrokerInner> captor = ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+ verify(mockMessageStore).putMessage(captor.capture());
+
+ Map<String, Object> eventData = JSON.parseObject(
+ new String(captor.getValue().getBody()),
+ Map.class
+ );
+
+ List<String> affectedTopics = (List<String>) eventData.get(RouteEventConstants.AFFECTED_TOPICS);
+
+ List<String> expectedTopics = Collections.singletonList("TestTopic");
+
+ assertEquals(expectedTopics, affectedTopics);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a464355..8117fc0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -141,6 +141,8 @@
private boolean slaveReadEnable = false;
+ private boolean enableRouteChangeNotification = false;
+
private boolean disableConsumeIfConsumerReadSlowly = false;
private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
@@ -749,6 +751,14 @@
this.slaveReadEnable = slaveReadEnable;
}
+ public boolean isEnableRouteChangeNotification() {
+ return enableRouteChangeNotification;
+ }
+
+ public void setEnableRouteChangeNotification(final boolean enableRouteChangeNotification) {
+ this.enableRouteChangeNotification = enableRouteChangeNotification;
+ }
+
public int getRegisterBrokerTimeoutMills() {
return registerBrokerTimeoutMills;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
index 47d45c6..fe5b7d3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
@@ -33,6 +33,7 @@
public static final String RMQ_SYS_SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
public static final String RMQ_SYS_OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
public static final String RMQ_SYS_ROCKSDB_OFFSET_TOPIC = "CHECKPOINT_TOPIC";
+ public static final String RMQ_ROUTE_EVENT_TOPIC = "ROUTE_EVENT_TOPIC";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
public static final String SYNC_BROKER_MEMBER_GROUP_PREFIX = SYSTEM_TOPIC_PREFIX + "SYNC_BROKER_MEMBER_";
@@ -64,6 +65,7 @@
SYSTEM_TOPIC_SET.add(RMQ_SYS_SELF_TEST_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT);
SYSTEM_TOPIC_SET.add(RMQ_SYS_ROCKSDB_OFFSET_TOPIC);
+ SYSTEM_TOPIC_SET.add(RMQ_ROUTE_EVENT_TOPIC);
NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC);
NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TRANS_HALF_TOPIC);
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index 41ce282..5c1d368 100644
--- a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -21,6 +21,7 @@
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.route.RouteEventType;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
@@ -80,6 +81,10 @@
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
+ if (this.routeEventService != null) {
+ this.routeEventService.publishEvent(RouteEventType.START);
+ }
+
}
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index a99b0af..5e15caf 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -207,6 +207,8 @@
private boolean enableAclRpcHookForClusterMode = false;
+ private boolean enableRouteChangeNotification = false;
+
private boolean useDelayLevel = false;
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
private transient ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentSkipListMap<>();
@@ -1062,6 +1064,14 @@
this.enableAclRpcHookForClusterMode = enableAclRpcHookForClusterMode;
}
+ public boolean isEnableRouteChangeNotification() {
+ return enableRouteChangeNotification;
+ }
+
+ public void setEnableRouteChangeNotification(final boolean enableRouteChangeNotification) {
+ this.enableRouteChangeNotification = enableRouteChangeNotification;
+ }
+
public boolean isEnableTopicMessageTypeCheck() {
return enableTopicMessageTypeCheck;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifier.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifier.java
new file mode 100644
index 0000000..23fa229
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifier.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.service.route;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.broker.route.RouteEventConstants;
+import org.apache.rocketmq.broker.route.RouteEventType;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.alibaba.fastjson2.JSON;
+
+public class RouteChangeNotifier {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+
+ private final LoadingCache<String, MessageQueueView> topicCache;
+ private final ThreadPoolExecutor routeRefreshExecutor;
+
+ private final ConcurrentMap<String, Long> dirtyTopics = new ConcurrentHashMap<>();
+ private final Queue<String> pendingTopics = new ConcurrentLinkedQueue<>();
+ private final ScheduledExecutorService scheduler;
+
+ private final DefaultMQPushConsumer consumer;
+ private final BiConsumer<String, Long> dirtyMarker;
+
+ public RouteChangeNotifier(LoadingCache<String, MessageQueueView> topicCache) {
+ this.topicCache = topicCache;
+
+ this.scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
+ new ThreadFactoryImpl("RouteUpdaterScheduler_")
+ );
+
+ this.routeRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
+ 2,
+ 4,
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ "TopicRouteCacheRefresh",
+ 1000
+ );
+
+ this.consumer = new DefaultMQPushConsumer("PROXY_ROUTE_EVENT_GROUP");
+ this.consumer.setMessageModel(MessageModel.BROADCASTING);
+ this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+
+ this.dirtyMarker = (topic, timeStamp) -> {
+ markCacheDirty(topic, timeStamp);
+ };
+ }
+
+ public void start() {
+ startEventSubscription();
+
+ scheduler.scheduleWithFixedDelay(this::processDirtyTopics, 50, 200, TimeUnit.MILLISECONDS);
+ }
+
+ public void shutdown() {
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+
+ if (scheduler != null) {
+ scheduler.shutdown();
+ }
+
+ if (routeRefreshExecutor != null) {
+ routeRefreshExecutor.shutdown();
+ }
+ }
+
+ private void startEventSubscription() {
+ try {
+ consumer.subscribe(TopicValidator.RMQ_ROUTE_EVENT_TOPIC, "*");
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ processEventMessages(msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+
+ consumer.start();
+ } catch (MQClientException e) {
+ log.error("Failed to start route event consumer", e);
+ }
+ }
+
+ private void processEventMessages(List<MessageExt> msgs) {
+ for (MessageExt msg : msgs) {
+ try {
+ String json = new String(msg.getBody(), StandardCharsets.UTF_8);
+ Map<String, Object> event = JSON.parseObject(json, Map.class);
+
+ log.info("[ROUTE_EVENT]: accept event {}", event);
+ RouteEventType eventType = RouteEventType.valueOf((String) event.get(RouteEventConstants.EVENT_TYPE));
+ Long eventTimeStamp = (Long) event.get(RouteEventConstants.TIMESTAMP);
+
+ switch (eventType) {
+ case START:
+ case SHUTDOWN:
+ case TOPIC_CHANGE:
+ List<String> affectedTopics = (List<String>) event.get(RouteEventConstants.AFFECTED_TOPICS);
+
+ if (affectedTopics != null) {
+ for (String topic : affectedTopics) {
+ dirtyMarker.accept(topic, eventTimeStamp);
+ }
+ } else {
+ log.info("[ROUTE_UPDATE] No affected topic specified in event: {}", event);
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ } catch (Exception e) {
+ log.error("[ROUTE_UPDATE]: Error processing route event", e);
+ }
+ }
+ }
+
+ public void markCacheDirty(String topic, long timeStamp) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - timeStamp > TimeUnit.MINUTES.toMillis(1)) {
+ return;
+ }
+
+ dirtyTopics.put(topic, currentTime);
+ pendingTopics.offer(topic);
+ }
+
+ public void markCompleted(String topic) {
+ dirtyTopics.remove(topic);
+ }
+
+ private void processDirtyTopics() {
+ List<String> batch = new ArrayList<>();
+ while (!pendingTopics.isEmpty() && batch.size() < 100) {
+ String topic = pendingTopics.poll();
+ if (topic == null) break;
+
+ Long timestamp = dirtyTopics.get(topic);
+ if (timestamp == null) continue;
+
+ if (System.currentTimeMillis() - timestamp > TimeUnit.MINUTES.toMillis(1)) {
+ markCompleted(topic);
+ log.error("refreshing topic: {} stop for delay", topic);
+ continue;
+ }
+
+ batch.add(topic);
+ }
+
+ for (String topic : batch) {
+ routeRefreshExecutor.execute(() -> {
+ refreshSingleRoute(topic);
+ markCompleted(topic);
+ });
+ }
+ }
+
+ private void refreshSingleRoute(String topic) {
+ try {
+ if (topicCache.getIfPresent(topic) == null) {
+ return;
+ }
+
+ topicCache.refresh(topic);
+
+ } catch (Exception e) {
+ log.error("Refresh failed for: {}", topic, e);
+ }
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index bcdf814..7b5ad1e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -56,6 +57,8 @@
private final MQClientAPIFactory mqClientAPIFactory;
private MQFaultStrategy mqFaultStrategy;
+ private RouteChangeNotifier routeChangeNotifier;
+
protected final LoadingCache<String /* topicName */, MessageQueueView> topicCache;
protected final ScheduledExecutorService scheduledExecutorService;
protected final ThreadPoolExecutor cacheRefreshExecutor;
@@ -85,6 +88,7 @@
public @Nullable MessageQueueView load(String topic) throws Exception {
try {
TopicRouteData topicRouteData = mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis());
+
return buildMessageQueueView(topic, topicRouteData);
} catch (Exception e) {
if (TopicRouteHelper.isTopicNotExistError(e)) {
@@ -98,6 +102,11 @@
public @Nullable MessageQueueView reload(@NonNull String key,
@NonNull MessageQueueView oldValue) throws Exception {
try {
+ if (routeChangeNotifier != null
+ && ConfigurationManager.getProxyConfig().isEnableRouteChangeNotification()) {
+ routeChangeNotifier.markCompleted(key);
+ }
+
return load(key);
} catch (Exception e) {
log.warn(String.format("reload topic route from namesrv. topic: %s", key), e);
@@ -134,6 +143,9 @@
}
}
}, serviceDetector);
+
+ this.routeChangeNotifier = new RouteChangeNotifier(this.topicCache);
+
this.init();
}
@@ -155,6 +167,10 @@
if (this.mqFaultStrategy.isStartDetectorEnable()) {
mqFaultStrategy.shutdown();
}
+
+ if (ConfigurationManager.getProxyConfig().isEnableRouteChangeNotification()) {
+ this.routeChangeNotifier.shutdown();
+ }
}
@Override
@@ -162,6 +178,10 @@
if (this.mqFaultStrategy.isStartDetectorEnable()) {
this.mqFaultStrategy.startDetector();
}
+
+ if (ConfigurationManager.getProxyConfig().isEnableRouteChangeNotification()) {
+ this.routeChangeNotifier.start();
+ }
}
public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) {
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifierTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifierTest.java
new file mode 100644
index 0000000..b46c46f
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/RouteChangeNotifierTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.fastjson2.JSON;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+
+public class RouteChangeNotifierTest {
+ private LoadingCache<String, MessageQueueView> mockCache;
+ private ThreadPoolExecutor mockExecutor;
+ private RouteChangeNotifier notifier;
+
+ @Before
+ public void setUp() {
+ mockCache = mock(LoadingCache.class);
+ mockExecutor = mock(ThreadPoolExecutor.class);
+ notifier = new RouteChangeNotifier(mockCache);
+ }
+
+ @Test
+ public void testHandleShutdownEvent() throws Exception {
+ Map<String, Object> eventData = new HashMap<>();
+ eventData.put("eventType", "SHUTDOWN");
+ eventData.put("brokerName", "TestBroker");
+ eventData.put("timestamp", System.currentTimeMillis());
+ eventData.put("affectedTopics", Arrays.asList("TopicA", "TopicB"));
+
+ MessageExt msg = new MessageExt();
+ msg.setBody(JSON.toJSONString(eventData).getBytes());
+
+ invokePrivateMethod(notifier, "processEventMessages", Collections.singletonList(msg));
+
+ verifyMarkCacheDirtyCalled("TopicA");
+ verifyMarkCacheDirtyCalled("TopicB");
+ }
+
+ @Test
+ public void testHandleTopicChangeEvent() throws Exception {
+ Map<String, Object> eventData = new HashMap<>();
+ eventData.put("eventType", "TOPIC_CHANGE");
+ eventData.put("brokerName", "TestBroker");
+ eventData.put("affectedTopics", Collections.singletonList("TestTopic"));
+ eventData.put("timestamp", System.currentTimeMillis());
+
+ MessageExt msg = new MessageExt();
+ msg.setBody(JSON.toJSONString(eventData).getBytes());
+
+ invokePrivateMethod(notifier, "processEventMessages", Collections.singletonList(msg));
+
+ verifyMarkCacheDirtyCalled("TestTopic");
+ }
+
+ @Test
+ public void testMarkCacheDirtyAddsToPendingTopics() throws Exception {
+ notifier.markCacheDirty("TestTopic", System.currentTimeMillis());
+
+ Field pendingTopicsField = RouteChangeNotifier.class.getDeclaredField("pendingTopics");
+ pendingTopicsField.setAccessible(true);
+ Queue<String> pendingTopics = (Queue<String>) pendingTopicsField.get(notifier);
+
+ assertTrue(pendingTopics.contains("TestTopic"));
+ }
+
+ @Test
+ public void testRefreshTriggersCacheRefresh() throws Exception {
+ notifier.markCacheDirty("TestTopic", System.currentTimeMillis());
+
+ when(mockCache.getIfPresent("TestTopic")).thenReturn(mock(MessageQueueView.class));
+
+ invokePrivateMethod(notifier, "refreshSingleRoute", "TestTopic");
+
+ verify(mockCache).refresh("TestTopic");
+ }
+
+ @Test
+ public void testRefreshFailureAddsToRetryQueue() throws Exception {
+ notifier.markCacheDirty("TestTopic", System.currentTimeMillis());
+
+ when(mockCache.getIfPresent("TestTopic")).thenReturn(mock(MessageQueueView.class));
+
+ doThrow(new RuntimeException("Refresh error")).when(mockCache).refresh("TestTopic");
+
+ invokePrivateMethod(notifier, "refreshSingleRoute", "TestTopic");
+
+ Field pendingTopicsField = RouteChangeNotifier.class.getDeclaredField("pendingTopics");
+ pendingTopicsField.setAccessible(true);
+ Queue<String> pendingTopics = (Queue<String>) pendingTopicsField.get(notifier);
+
+ assertTrue(pendingTopics.contains("TestTopic"));
+ }
+
+ private void invokePrivateMethod(Object obj, String methodName, Object arg) throws Exception {
+ java.lang.reflect.Method method;
+
+ if (arg instanceof List) {
+ method = obj.getClass().getDeclaredMethod(methodName, List.class);
+ method.setAccessible(true);
+ method.invoke(obj, arg);
+ } else if (arg instanceof String) {
+ method = obj.getClass().getDeclaredMethod(methodName, String.class);
+ method.setAccessible(true);
+ method.invoke(obj, arg);
+ }
+ }
+
+ private void verifyMarkCacheDirtyCalled(String topic) throws Exception {
+ Field pendingTopicsField = RouteChangeNotifier.class.getDeclaredField("pendingTopics");
+ pendingTopicsField.setAccessible(true);
+ Queue<String> pendingTopics = (Queue<String>) pendingTopicsField.get(notifier);
+
+ assertTrue("Topic " + topic + " should be in pending topics",
+ pendingTopics.contains(topic));
+ }
+}