[RIP-81] Master-slave metadata incremental synchronization (#9617)
* [ISSUE #9616] Master-slave metadata incremental synchronization
* commit
* Fix test, adapt topic and subscriptionConfig's rocksdb mode, optimize code
* Fix test
* Fix test
* Fix test
* Fix test
* 1. Simplified log printing
2. Increased snapshot storage interval to reduce the increased garbage collection frequency caused by deep copies
3. Added DataVersion transmission
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7b1701c..32be940 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,8 +16,10 @@
*/
package org.apache.rocketmq.broker;
+import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory;
import org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager;
import org.apache.rocketmq.auth.authorization.factory.AuthorizationFactory;
@@ -84,6 +86,11 @@
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.sync.MetadataChangeObserver;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
+import org.apache.rocketmq.broker.sync.SyncMessageProducer;
+import org.apache.rocketmq.broker.sync.SyncMetadataChangeObserver;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
@@ -110,6 +117,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.stats.MomentStatsItem;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -156,11 +164,19 @@
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;
+import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -238,6 +254,7 @@
protected final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
protected final BlockingQueue<Runnable> adminBrokerThreadPoolQueue;
protected final BlockingQueue<Runnable> loadBalanceThreadPoolQueue;
+ protected final BlockingQueue<Runnable> metadataIncrementalSyncThreadPoolQueue;
protected BrokerStatsManager brokerStatsManager;
protected final List<SendMessageHook> sendMessageHookList = new ArrayList<>();
protected final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
@@ -266,6 +283,7 @@
protected ExecutorService consumerManageExecutor;
protected ExecutorService loadBalanceExecutor;
protected ExecutorService endTransactionExecutor;
+ protected ExecutorService metadataIncrementalSyncExecutor;
protected boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
@@ -300,6 +318,7 @@
private TransactionMetricsFlushService transactionMetricsFlushService;
private AuthenticationMetadataManager authenticationMetadataManager;
private AuthorizationMetadataManager authorizationMetadataManager;
+ private MetadataChangeObserver metadataChangeObserver;
private ConfigContext configContext;
@@ -421,6 +440,14 @@
this.loadBalanceThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getLoadBalanceThreadPoolQueueCapacity());
this.brokerFastFailure = new BrokerFastFailure(this);
+ if (this.brokerConfig.isAllowMetadataIncrementalSync() && this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
+ SyncMessageProducer syncMessageProducer = new SyncMessageProducer(this);
+ this.metadataIncrementalSyncThreadPoolQueue = new LinkedBlockingQueue<>(10);
+ metadataChangeObserver = new SyncMetadataChangeObserver(syncMessageProducer);
+ } else {
+ metadataChangeObserver = new NoopMetadataChangeObserver();
+ this.metadataIncrementalSyncThreadPoolQueue = null;
+ }
String brokerConfigPath;
if (brokerConfig.getBrokerConfigPath() != null && !brokerConfig.getBrokerConfigPath().isEmpty()) {
@@ -637,6 +664,16 @@
this.loadBalanceThreadPoolQueue,
new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));
+ if (this.brokerConfig.isAllowMetadataIncrementalSync() && this.brokerConfig.getBrokerId() == 0L) {
+ this.metadataIncrementalSyncExecutor = ThreadUtils.newThreadPoolExecutor(
+ this.brokerConfig.getMetadataIncrementalSyncThreadPoolNums(),
+ this.brokerConfig.getMetadataIncrementalSyncThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.metadataIncrementalSyncThreadPoolQueue,
+ new ThreadFactoryImpl("MetadataIncrementalSyncThread_", getBrokerIdentity()));
+ }
+
this.syncBrokerMemberGroupExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
this.brokerHeartbeatExecutorService = ThreadUtils.newScheduledThreadPool(1,
@@ -747,7 +784,11 @@
public void run() {
try {
if (System.currentTimeMillis() - lastSyncTimeMs > 60 * 1000) {
- BrokerController.this.getSlaveSynchronize().syncAll();
+ if (!getBrokerConfig().isAllowMetadataIncrementalSync()) {
+ BrokerController.this.getSlaveSynchronize().syncAll();
+ } else {
+ BrokerController.this.getSlaveSynchronize().start();
+ }
lastSyncTimeMs = System.currentTimeMillis();
}
@@ -775,12 +816,164 @@
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
+ if (this.brokerConfig.isAllowMetadataIncrementalSync() && this.brokerConfig.getBrokerId() == 0L) {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ this.consumerOffsetManager.checkAndSync();
+ } catch (Throwable e) {
+ LOG.error("Failed to execute scheduled sync task", e);
+ }
+ }, 1000, 1000, TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ deleteOutdatedSnapshot();
+ persistBrokerConfigSnapshotsSeparately();
+ } catch (Throwable e) {
+ LOG.error("Failed to execute scheduled sync task", e);
+ }
+ }, 10, this.brokerConfig.getSnapshotIntervalSeconds(), TimeUnit.SECONDS);
+ }
if (this.brokerConfig.isEnableControllerMode()) {
this.updateMasterHAServerAddrPeriodically = true;
}
}
+
+ private void persistBrokerConfigSnapshotsSeparately() {
+ LOG.info("Starting to persist broker config snapshots separately...");
+
+ String snapshotDir = messageStoreConfig.getStorePathRootDir() + File.separator + "snapshot";
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+ String timestamp = dateFormat.format(new Date());
+
+ try {
+ Path dirPath = Paths.get(snapshotDir);
+ if (Files.notExists(dirPath)) {
+ Files.createDirectories(dirPath);
+ }
+
+ submitSnapshotTask(() -> {
+ try {
+ long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC, 0);
+ Object snapshotData = Triple.of(topicConfigManager.deepCopyTopicConfigTable(), topicConfigManager.getDataVersion(), maxOffset);
+ writeSnapshotToFile(MixAll.SNAPSHOT_NAME_TOPIC_CONFIG, snapshotData, snapshotDir, timestamp);
+ } catch (Throwable e) {
+ LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_TOPIC_CONFIG, e);
+ }
+ });
+
+ submitSnapshotTask(() -> {
+ try {
+ long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, 0);
+ Object snapshotData = Triple.of(subscriptionGroupManager.deepCopySubscriptionGroupTable(), subscriptionGroupManager.getDataVersion(), maxOffset);
+ writeSnapshotToFile(MixAll.SNAPSHOT_NAME_SUBSCRIPTION_GROUP, snapshotData, snapshotDir, timestamp);
+ } catch (Throwable e) {
+ LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_SUBSCRIPTION_GROUP, e);
+ }
+ });
+
+ submitSnapshotTask(() -> {
+ try {
+ long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC, 0);
+ Object snapshotData = Triple.of(consumerOffsetManager.deepCopyOffsetTableSnapshot(), consumerOffsetManager.getDataVersion(), maxOffset);
+ writeSnapshotToFile(MixAll.SNAPSHOT_NAME_CONSUMER_OFFSET, snapshotData, snapshotDir, timestamp);
+ } catch (Throwable e) {
+ LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_CONSUMER_OFFSET, e);
+ }
+ });
+
+ submitSnapshotTask(() -> {
+ try {
+ long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, 0);
+ Object snapshotData = Triple.of(this.getScheduleMessageService().deepCopyDelayOffsetTable(), this.getScheduleMessageService().getDataVersion(), maxOffset);
+ writeSnapshotToFile(MixAll.SNAPSHOT_NAME_DELAY_OFFSET, snapshotData, snapshotDir, timestamp);
+ } catch (Throwable e) {
+ LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_DELAY_OFFSET, e);
+ }
+ });
+
+ submitSnapshotTask(() -> {
+ try {
+ long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, 0);
+ Object snapshotData = Pair.of(this.queryAssignmentProcessor.getMessageRequestModeManager().deepCopyMessageRequestModeMapSnapshot(), maxOffset);
+ writeSnapshotToFile(MixAll.SNAPSHOT_NAME_MESSAGE_MODE, snapshotData, snapshotDir, timestamp);
+ } catch (Throwable e) {
+ LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_MESSAGE_MODE, e);
+ }
+ });
+
+ submitSnapshotTask(() -> {
+ try {
+ long maxOffset = this.messageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, 0);
+ Object snapshotData = Triple.of(this.messageStore.getTimerMessageStore().getTimerMetrics().deepCopyTimingCountSnapshot(),this.messageStore.getTimerMessageStore().getTimerMetrics().getDataVersion(), maxOffset);
+ writeSnapshotToFile(MixAll.SNAPSHOT_NAME_TIMER_METRICS, snapshotData, snapshotDir, timestamp);
+ } catch (Throwable e) {
+ LOG.error("Failed to persist {} snapshot", MixAll.SNAPSHOT_NAME_TIMER_METRICS, e);
+ }
+ });
+
+ } catch (Throwable e) {
+ LOG.error("Failed to create snapshot directory", e);
+ }
+ }
+
+ private void writeSnapshotToFile(String snapshotName, Object snapshotData, String snapshotDir, String timestamp) throws Exception {
+ String jsonSnapshot = JSON.toJSONString(snapshotData);
+ String filename = String.format("%s_snapshot_%s.json", snapshotName, timestamp);
+ Path filePath = Paths.get(snapshotDir, filename);
+ Files.write(filePath, jsonSnapshot.getBytes(StandardCharsets.UTF_8));
+ }
+
+ private void submitSnapshotTask(Runnable task) {
+ metadataIncrementalSyncExecutor.submit(task);
+ }
+
+
+ private void deleteOutdatedSnapshot() {
+ try {
+ String snapshotDir = messageStoreConfig.getStorePathRootDir() + File.separator + "snapshot";
+ Path dirPath = Paths.get(snapshotDir);
+
+ if (Files.notExists(dirPath)) {
+ return;
+ }
+
+ MixAll.SNAPSHOT_NAMES.forEach(name -> {
+ List<Path> snapshotFiles = null;
+ try {
+ snapshotFiles = Files.list(dirPath)
+ .filter(path -> path.getFileName().toString().startsWith(name))
+ .sorted((p1, p2) -> {
+ try {
+ return Files.getLastModifiedTime(p2).compareTo(Files.getLastModifiedTime(p1));
+ } catch (IOException e) {
+ return 0;
+ }
+ })
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (snapshotFiles.size() > 1) {
+ for (int i = 1; i < snapshotFiles.size(); i++) {
+ try {
+ Files.delete(snapshotFiles.get(i));
+ } catch (IOException e) {
+ LOG.warn("Failed to delete {}", snapshotFiles.get(i), e);
+ }
+ }
+ }
+ });
+
+ } catch (Throwable e) {
+ LOG.error("Error in snapshot cleanup", e);
+ }
+ }
+
+
protected void initializeScheduledTasks() {
initializeBrokerScheduledTasks();
@@ -862,7 +1055,7 @@
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
if (messageStoreConfig.isTimerWheelEnable()) {
this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
- TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
+ TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()), metadataChangeObserver);
this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
@@ -1381,6 +1574,14 @@
this.subscriptionGroupManager = subscriptionGroupManager;
}
+ public MetadataChangeObserver getMetadataChangeObserver() {
+ return metadataChangeObserver;
+ }
+
+ public void setMetadataChangeObserver(MetadataChangeObserver metadataChangeObserver) {
+ this.metadataChangeObserver = metadataChangeObserver;
+ }
+
public SubscriptionGroupManager getSubscriptionGroupManager() {
return subscriptionGroupManager;
}
@@ -1567,7 +1768,7 @@
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(false);
}
-
+
if (this.loadBalanceExecutor != null) {
this.loadBalanceExecutor.shutdown();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index f7e0de9..e414263 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -128,8 +128,7 @@
@Override
public SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
String groupName = subscriptionGroupConfig.getGroupName();
- SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig);
-
+ SubscriptionGroupConfig oldConfig = super.putSubscriptionGroupConfig(subscriptionGroupConfig);
try {
byte[] keyBytes = groupName.getBytes(DataConverter.CHARSET_UTF8);
byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible);
@@ -143,7 +142,7 @@
@Override
protected SubscriptionGroupConfig putSubscriptionGroupConfigIfAbsent(SubscriptionGroupConfig subscriptionGroupConfig) {
String groupName = subscriptionGroupConfig.getGroupName();
- SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.putIfAbsent(groupName, subscriptionGroupConfig);
+ SubscriptionGroupConfig oldConfig = super.putSubscriptionGroupConfigIfAbsent(subscriptionGroupConfig);
if (oldConfig == null) {
try {
byte[] keyBytes = groupName.getBytes(DataConverter.CHARSET_UTF8);
@@ -158,7 +157,7 @@
@Override
protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) {
- SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.remove(groupName);
+ SubscriptionGroupConfig subscriptionGroupConfig = super.removeSubscriptionGroupConfig(groupName);
try {
this.rocksDBConfigManager.delete(groupName.getBytes(DataConverter.CHARSET_UTF8));
} catch (Exception e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index d64f808..591cac1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -110,7 +110,8 @@
@Override
public TopicConfig putTopicConfig(TopicConfig topicConfig) {
String topicName = topicConfig.getTopicName();
- TopicConfig oldTopicConfig = this.topicConfigTable.put(topicName, topicConfig);
+ TopicConfig oldTopicConfig = super.putTopicConfig(topicConfig);
+
try {
byte[] keyBytes = topicName.getBytes(DataConverter.CHARSET_UTF8);
byte[] valueBytes = JSON.toJSONBytes(topicConfig, SerializerFeature.BrowserCompatible);
@@ -123,7 +124,7 @@
@Override
protected TopicConfig removeTopicConfig(String topicName) {
- TopicConfig topicConfig = this.topicConfigTable.remove(topicName);
+ TopicConfig topicConfig = super.removeTopicConfig(topicName);
try {
this.rocksDBConfigManager.delete(topicName.getBytes(DataConverter.CHARSET_UTF8));
} catch (Exception e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index f22f22a..7dc526e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -354,7 +354,11 @@
slaveSyncFuture = this.brokerController.getScheduledExecutorService().scheduleAtFixedRate(() -> {
try {
if (System.currentTimeMillis() - lastSyncTimeMs > 10 * 1000) {
- brokerController.getSlaveSynchronize().syncAll();
+ if (!this.brokerController.getBrokerConfig().isAllowMetadataIncrementalSync()) {
+ this.brokerController.getSlaveSynchronize().syncAll();
+ } else {
+ this.brokerController.getSlaveSynchronize().start();
+ }
lastSyncTimeMs = System.currentTimeMillis();
}
//timer checkpoint, latency-sensitive, so sync it more frequently
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
index e6cb976..7ed3b97 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
@@ -114,7 +114,11 @@
public void run() {
try {
if (System.currentTimeMillis() - lastSyncTimeMs > 10 * 1000) {
- brokerController.getSlaveSynchronize().syncAll();
+ if (!brokerController.getBrokerConfig().isAllowMetadataIncrementalSync()) {
+ brokerController.getSlaveSynchronize().syncAll();
+ } else {
+ brokerController.getSlaveSynchronize().start();
+ }
lastSyncTimeMs = System.currentTimeMillis();
}
//timer checkpoint, latency-sensitive, so sync it more frequently
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java b/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
index 0c69e2d..d86d226 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
@@ -16,10 +16,12 @@
*/
package org.apache.rocketmq.broker.loadbalance;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
@@ -42,15 +44,34 @@
ConcurrentHashMap<String, SetMessageRequestModeRequestBody> consumerGroup2ModeMap = messageRequestModeMap.get(topic);
if (consumerGroup2ModeMap == null) {
consumerGroup2ModeMap = new ConcurrentHashMap<>();
- ConcurrentHashMap<String, SetMessageRequestModeRequestBody> pre =
- messageRequestModeMap.putIfAbsent(topic, consumerGroup2ModeMap);
- if (pre != null) {
- consumerGroup2ModeMap = pre;
+ ConcurrentHashMap<String, SetMessageRequestModeRequestBody>[] pre = new ConcurrentHashMap[1];
+ ConcurrentHashMap<String, SetMessageRequestModeRequestBody> finalConsumerGroup2ModeMap = consumerGroup2ModeMap;
+ messageRequestModeMap.compute(topic, (key, existingValue) -> {
+ if (existingValue == null) {
+ notifyMessageRequestModeCreated(requestBody);
+ pre[0] = null;
+ return finalConsumerGroup2ModeMap;
+ } else {
+ notifyMessageRequestModeUpdated(requestBody);
+ pre[0] = existingValue;
+ return existingValue;
+ }
+ });
+ if (pre[0] != null) {
+ consumerGroup2ModeMap = pre[0];
}
}
consumerGroup2ModeMap.put(consumerGroup, requestBody);
}
+ private void notifyMessageRequestModeCreated(SetMessageRequestModeRequestBody requestBody) {
+ brokerController.getMetadataChangeObserver().onCreated(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, requestBody.getTopic(), requestBody);
+ }
+
+ private void notifyMessageRequestModeUpdated(SetMessageRequestModeRequestBody requestBody) {
+ brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, requestBody.getTopic(), requestBody);
+ }
+
public SetMessageRequestModeRequestBody getMessageRequestMode(String topic, String consumerGroup) {
ConcurrentHashMap<String, SetMessageRequestModeRequestBody> consumerGroup2ModeMap = messageRequestModeMap.get(topic);
if (consumerGroup2ModeMap != null) {
@@ -92,4 +113,31 @@
public String encode(boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}
+
+ public ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> deepCopyMessageRequestModeMapSnapshot() {
+ ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> newOuterMap = new ConcurrentHashMap<>(this.messageRequestModeMap.size());
+ for (Map.Entry<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> topicEntry : this.messageRequestModeMap.entrySet()) {
+ String topic = topicEntry.getKey();
+ ConcurrentHashMap<String, SetMessageRequestModeRequestBody> originalInnerMap = topicEntry.getValue();
+
+ if (originalInnerMap != null) {
+ ConcurrentHashMap<String, SetMessageRequestModeRequestBody> newInnerMap = new ConcurrentHashMap<>(originalInnerMap.size());
+ for (Map.Entry<String, SetMessageRequestModeRequestBody> groupEntry : originalInnerMap.entrySet()) {
+ String consumerGroup = groupEntry.getKey();
+ SetMessageRequestModeRequestBody originalBody = groupEntry.getValue();
+
+ if (originalBody != null) {
+ try {
+ SetMessageRequestModeRequestBody clonedBody = originalBody.clone();
+ newInnerMap.put(consumerGroup, clonedBody);
+ } catch (CloneNotSupportedException e) {
+ newInnerMap.put(consumerGroup, originalBody);
+ }
+ }
+ }
+ newOuterMap.put(topic, newInnerMap);
+ }
+ }
+ return newOuterMap;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index e1e1cb4..e74428c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.offset;
import com.google.common.collect.Maps;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -25,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Strings;
@@ -36,6 +38,7 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
@@ -54,7 +57,19 @@
new ConcurrentHashMap<>(512);
private final ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> pullOffsetTable =
- new ConcurrentHashMap<>(512);
+ new ConcurrentHashMap<>(512);
+
+ private volatile ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> workingBuffer =
+ new ConcurrentHashMap<>(512);
+
+ private volatile ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> syncBuffer =
+ new ConcurrentHashMap<>(512);
+
+ private final Object bufferSwapLock = new Object();
+
+ private transient ScheduledExecutorService scheduledExecutorService;
+
+ private final transient AtomicLong lastSyncTimestamp = new AtomicLong(System.currentTimeMillis());
protected transient BrokerController brokerController;
@@ -70,6 +85,36 @@
protected void removeConsumerOffset(String topicAtGroup) {
}
+ private void triggerBufferSwapAndSync() {
+ synchronized (bufferSwapLock) {
+ ConcurrentMap<String, ConcurrentMap<Integer, Long>> oldSyncBuffer = this.syncBuffer;
+ this.syncBuffer = this.workingBuffer;
+ this.workingBuffer = new ConcurrentHashMap<>(512);
+ if (oldSyncBuffer != null && !oldSyncBuffer.isEmpty()) {
+ oldSyncBuffer.forEach((key, map) ->
+ this.syncBuffer.computeIfAbsent(key, k -> new ConcurrentHashMap<>()).putAll(map));
+ }
+ }
+ if (!this.syncBuffer.isEmpty()) {
+ try {
+ this.brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC, String.valueOf(dataVersion), this.syncBuffer);
+ } catch (Exception e) {
+ LOG.error("Failed to sync consumer offsets.", e);
+ } finally {
+ this.lastSyncTimestamp.set(System.currentTimeMillis());
+ this.syncBuffer.clear();
+ }
+ }
+ }
+
+ public void checkAndSync() {
+ boolean timeCondition = (System.currentTimeMillis() - lastSyncTimestamp.get()) > 1000;
+ boolean bufferHasData = !workingBuffer.isEmpty();
+ if (timeCondition && bufferHasData) {
+ LOG.debug("Consumer offset sync triggered by time interval.");
+ this.triggerBufferSwapAndSync();
+ }
+ }
public void cleanOffset(String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
@@ -216,9 +261,12 @@
LOG.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
}
}
+ ConcurrentMap<Integer, Long> workingMap = this.workingBuffer.computeIfAbsent(key, k -> new ConcurrentHashMap<>(32));
+ workingMap.put(queueId, offset);
if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep() == 0) {
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
+ this.triggerBufferSwapAndSync();
}
}
@@ -370,7 +418,10 @@
public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
if (offsets != null) {
- this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<>(offsets));
+ ConcurrentMap<Integer, Long> newOffsets = new ConcurrentHashMap<>(offsets);
+ this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, newOffsets);
+ this.workingBuffer.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<>(newOffsets));
+ this.triggerBufferSwapAndSync();
}
}
@@ -418,13 +469,15 @@
}
return removed;
};
-
boolean clearOffset = deleteFunction.apply(this.offsetTable.entrySet().iterator());
+ boolean clearWorking = deleteFunction.apply(this.workingBuffer.entrySet().iterator());
+ boolean clearSync = deleteFunction.apply(this.syncBuffer.entrySet().iterator());
boolean clearReset = deleteFunction.apply(this.resetOffsetTable.entrySet().iterator());
boolean clearPull = deleteFunction.apply(this.pullOffsetTable.entrySet().iterator());
LOG.info("Consumer offset manager clean group offset, groupName={}, " +
- "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull);
+ "offsetTable={}, workingBuffer={}, syncBuffer={}, resetOffsetTable={}, pullOffsetTable={}",
+ group, clearOffset, clearWorking, clearSync, clearReset, clearPull);
}
public void assignResetOffset(String topic, String group, int queueId, long offset) {
@@ -443,6 +496,8 @@
// 2, Our overriding here may get overridden by the client instantly in concurrent cases; But it still makes
// sense in cases like clients are offline.
offsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset);
+ workingBuffer.computeIfAbsent(key, k -> new ConcurrentHashMap<>()).put(queueId, offset);
+ this.triggerBufferSwapAndSync();
}
public boolean hasOffsetReset(String topic, String group, int queueId) {
@@ -463,4 +518,19 @@
return map.remove(queueId);
}
}
+
+ public ConcurrentMap<String, ConcurrentMap<Integer, Long>> deepCopyOffsetTableSnapshot() {
+ ConcurrentMap<String, ConcurrentMap<Integer, Long>> newTable = new ConcurrentHashMap<>(this.offsetTable.size());
+ for (Map.Entry<String, ConcurrentMap<Integer, Long>> entry : this.offsetTable.entrySet()) {
+ String topicAtGroup = entry.getKey();
+ ConcurrentMap<Integer, Long> originalInnerMap = entry.getValue();
+
+ if (originalInnerMap != null) {
+ ConcurrentMap<Integer, Long> newInnerMap = new ConcurrentHashMap<>(originalInnerMap.size());
+ newInnerMap.putAll(originalInnerMap);
+ newTable.put(topicAtGroup, newInnerMap);
+ }
+ }
+ return newTable;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 21ba349..329245f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -17,11 +17,14 @@
package org.apache.rocketmq.broker.out;
import com.alibaba.fastjson2.JSON;
+
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -35,6 +38,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+
+import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
@@ -100,6 +105,7 @@
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.RegisterBrokerBody;
+import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
@@ -1627,6 +1633,271 @@
return pullResult;
}
+ public Triple<ConcurrentMap<String, TopicConfig>, DataVersion, Long> getTopicConfigSnapShot(String brokerAddr)
+ throws Exception {
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG_SNAPSHOT, null);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+ if (response.getCode() != ResponseCode.SUCCESS) {
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ byte[] body = response.getBody();
+ if (body == null || body.length == 0) {
+ return null;
+ }
+
+ String jsonString = new String(body, StandardCharsets.UTF_8);
+
+ Triple<LinkedHashMap<String, Object>, DataVersion, Long> rawSnapshotData =
+ JSON.parseObject(
+ jsonString,
+ new TypeReference<Triple<LinkedHashMap<String, Object>, DataVersion, Long>>() {
+ }
+ );
+
+ ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
+ LinkedHashMap<String, Object> linkedMap = rawSnapshotData.getLeft();
+
+ if (linkedMap != null) {
+ for (Map.Entry<String, Object> entry : linkedMap.entrySet()) {
+ String topicName = entry.getKey();
+ Object value = entry.getValue();
+
+ if (value != null) {
+ try {
+ String configJson = JSON.toJSONString(value);
+ TopicConfig topicConfig = JSON.parseObject(configJson, TopicConfig.class);
+
+ if (topicConfig != null) {
+ try {
+ TopicConfig clonedConfig = topicConfig.clone();
+ topicConfigTable.put(topicName, clonedConfig);
+ } catch (CloneNotSupportedException e) {
+ topicConfigTable.put(topicName, topicConfig);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse TopicConfig for topic: {}, error: {}", topicName, e.getMessage());
+ }
+ }
+ }
+ }
+
+ DataVersion dataVersion =
+ JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+ Long maxOffset =
+ JSON.to(Long.class, rawSnapshotData.getRight());
+
+ return Triple.of(topicConfigTable, dataVersion, maxOffset);
+ }
+
+
+ public Triple<ConcurrentMap<String, ConcurrentMap<Integer, Long>>, DataVersion, Long> getConsumerOffsetSnapShot(String brokerAddr)
+ throws Exception {
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_OFFSET_SNAPSHOT, null);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+ if (response.getCode() != ResponseCode.SUCCESS) {
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ byte[] body = response.getBody();
+ if (body == null || body.length == 0) {
+ return null;
+ }
+ String jsonString = new String(body, StandardCharsets.UTF_8);
+ Triple<LinkedHashMap<String, LinkedHashMap<Integer, Long>>, DataVersion, Long> rawSnapshotData =
+ JSON.parseObject(
+ jsonString,
+ new TypeReference<Triple<LinkedHashMap<String, LinkedHashMap<Integer, Long>>, DataVersion, Long>>() {
+ }
+ );
+
+ ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>();
+ LinkedHashMap<String, LinkedHashMap<Integer, Long>> linkedMap = rawSnapshotData.getLeft();
+
+ if (linkedMap != null) {
+ for (Map.Entry<String, LinkedHashMap<Integer, Long>> outerEntry : linkedMap.entrySet()) {
+ ConcurrentMap<Integer, Long> innerConcurrentMap = new ConcurrentHashMap<>(outerEntry.getValue());
+ offsetTable.put(outerEntry.getKey(), innerConcurrentMap);
+ }
+ }
+ DataVersion dataVersion =
+ JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+ Long maxOffset =
+ JSON.to(Long.class, rawSnapshotData.getRight());
+
+ return Triple.of(offsetTable, dataVersion, maxOffset);
+ }
+
+ public Triple<ConcurrentMap<String, SubscriptionGroupConfig>, DataVersion, Long> getSubscriptionGroupSnapShot(String brokerAddr)
+ throws Exception {
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SUBSCRIPTION_GROUP_SNAPSHOT, null);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+ if (response.getCode() != ResponseCode.SUCCESS) {
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ byte[] body = response.getBody();
+ if (body == null || body.length == 0) {
+ return null;
+ }
+
+ String jsonString = new String(body, StandardCharsets.UTF_8);
+ Triple<LinkedHashMap<String, Object>, DataVersion, Long> rawSnapshotData =
+ JSON.parseObject(
+ jsonString,
+ new TypeReference<Triple<LinkedHashMap<String, Object>, DataVersion, Long>>() {
+ }
+ );
+
+ ConcurrentMap<String, SubscriptionGroupConfig> subGroupTable = new ConcurrentHashMap<>();
+ LinkedHashMap<String, Object> linkedMap = rawSnapshotData.getLeft();
+ if (linkedMap != null) {
+ for (Map.Entry<String, Object> entry : linkedMap.entrySet()) {
+ String groupName = entry.getKey();
+ String configJson = JSON.toJSONString(entry.getValue());
+ SubscriptionGroupConfig newConfig = JSON.parseObject(configJson, SubscriptionGroupConfig.class);
+
+ subGroupTable.put(groupName, newConfig);
+ }
+ }
+
+ DataVersion dataVersion =
+ JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+ Long maxOffset =
+ JSON.to(Long.class, rawSnapshotData.getRight());
+
+ return Triple.of(subGroupTable, dataVersion, maxOffset);
+ }
+
+ public Triple<ConcurrentMap<String, Long>, DataVersion, Long> getDelayOffsetSnapShot(String brokerAddr)
+ throws Exception {
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_DELAY_OFFSET_SNAPSHOT, null);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+ if (response.getCode() != ResponseCode.SUCCESS) {
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ byte[] body = response.getBody();
+ if (body == null || body.length == 0) {
+ return null;
+ }
+
+ String jsonString = new String(body, StandardCharsets.UTF_8);
+ Triple<LinkedHashMap<String, Object>, DataVersion, Long> rawSnapshotData =
+ JSON.parseObject(
+ jsonString,
+ new TypeReference<Triple<LinkedHashMap<String, Object>, DataVersion, Long>>() {
+ }
+ );
+
+ ConcurrentMap<String, Long> delayOffsetTable = new ConcurrentHashMap<>();
+ LinkedHashMap<String, Object> linkedMap = rawSnapshotData.getLeft();
+ if (linkedMap != null) {
+ for (Map.Entry<String, Object> entry : linkedMap.entrySet()) {
+ String key = entry.getKey();
+ Long offsetValue = JSON.to(Long.class, entry.getValue());
+ delayOffsetTable.put(key, offsetValue);
+ }
+ }
+
+ DataVersion dataVersion =
+ JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+ Long maxOffset =
+ JSON.to(Long.class, rawSnapshotData.getRight());
+
+ return Triple.of(delayOffsetTable, dataVersion, maxOffset);
+ }
+
+
+ public Triple<ConcurrentMap<String, TimerMetrics.Metric>, DataVersion, Long> getTopicMetricsSnapShot(String brokerAddr)
+ throws Exception {
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_METRICS_SNAPSHOT, null);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+ if (response.getCode() != ResponseCode.SUCCESS) {
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ byte[] body = response.getBody();
+ if (body == null || body.length == 0) {
+ return null;
+ }
+
+ String jsonString = new String(body, StandardCharsets.UTF_8);
+ Triple<LinkedHashMap<String, TimerMetrics.Metric>, DataVersion, Long> rawSnapshotData =
+ JSON.parseObject(
+ jsonString,
+ new TypeReference<Triple<LinkedHashMap<String, TimerMetrics.Metric>, DataVersion, Long>>() {
+ }
+ );
+
+ ConcurrentMap<String, TimerMetrics.Metric> topicMetricsTable = new ConcurrentHashMap<>();
+ LinkedHashMap<String, TimerMetrics.Metric> linkedMap = rawSnapshotData.getLeft();
+ if (linkedMap != null) {
+ topicMetricsTable.putAll(linkedMap);
+ }
+
+ DataVersion dataVersion =
+ JSON.to(DataVersion.class, rawSnapshotData.getMiddle());
+
+ Long maxOffset =
+ JSON.to(Long.class, rawSnapshotData.getRight());
+
+ return Triple.of(topicMetricsTable, dataVersion, maxOffset);
+ }
+
+
+ public Pair<ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>>, Long> getSetMessageRequestModeSnapShot(String brokerAddr)
+ throws Exception {
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MESSAGE_REQUEST_MODE_SNAPSHOT, null);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);
+
+ if (response.getCode() != ResponseCode.SUCCESS) {
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ byte[] body = response.getBody();
+ if (body == null || body.length == 0) {
+ return null;
+ }
+
+ String jsonString = new String(body, StandardCharsets.UTF_8);
+ Pair<LinkedHashMap<String, LinkedHashMap<String, SetMessageRequestModeRequestBody>>, Long> rawSnapshotData =
+ JSON.parseObject(
+ jsonString,
+ new TypeReference<Pair<LinkedHashMap<String, LinkedHashMap<String, SetMessageRequestModeRequestBody>>, Long>>() {
+ }
+ );
+
+ ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> requestModeMap = new ConcurrentHashMap<>();
+ LinkedHashMap<String, LinkedHashMap<String, SetMessageRequestModeRequestBody>> linkedMap = rawSnapshotData.getObject1();
+ if (linkedMap != null) {
+ for (Map.Entry<String, LinkedHashMap<String, SetMessageRequestModeRequestBody>> outerEntry : linkedMap.entrySet()) {
+ ConcurrentHashMap<String, SetMessageRequestModeRequestBody> innerConcurrentMap = new ConcurrentHashMap<>(outerEntry.getValue());
+ requestModeMap.put(outerEntry.getKey(), innerConcurrentMap);
+ }
+ }
+
+ Long maxOffset =
+ JSON.to(Long.class, rawSnapshotData.getObject2());
+
+ return Pair.of(requestModeMap, maxOffset);
+ }
+
private int getMaxPageSize() {
return 2000;
}
@@ -1634,5 +1905,4 @@
private long getTimeoutMillis() {
return TimeUnit.SECONDS.toMillis(60);
}
-
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 298e239..511765d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -22,11 +22,18 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
+
+import java.io.File;
+import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -405,6 +412,18 @@
return this.listAcl(ctx, request);
case RequestCode.POP_ROLLBACK:
return this.transferPopToFsStore(ctx, request);
+ case RequestCode.GET_CONSUMER_OFFSET_SNAPSHOT:
+ return this.getConsumerOffsetSnapshot(ctx, request);
+ case RequestCode.GET_SUBSCRIPTION_GROUP_SNAPSHOT:
+ return this.getSubscriptionGroupSnapshot(ctx, request);
+ case RequestCode.GET_TOPIC_CONFIG_SNAPSHOT:
+ return this.getTopicConfigSnapshot(ctx, request);
+ case RequestCode.GET_DELAY_OFFSET_SNAPSHOT:
+ return this.getDelayOffsetSnapshot(ctx, request);
+ case RequestCode.GET_TOPIC_METRICS_SNAPSHOT:
+ return this.getTopicMetricsSnapshot(ctx, request);
+ case RequestCode.GET_MESSAGE_REQUEST_MODE_SNAPSHOT:
+ return this.getMessageRequestModeSnapshot(ctx, request);
default:
return getUnknownCmdResponse(ctx, request);
}
@@ -3408,4 +3427,141 @@
}
return response;
}
+
+
+ private RemotingCommand getMessageRequestModeSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ try {
+ byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_MESSAGE_MODE);
+ if (body != null) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(body);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("No latest snapshot file found for message_mode.");
+ }
+ } catch (Exception e) {
+ LOGGER.error("getMessageRequestModeSnapshot error", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
+ return response;
+ }
+
+ private RemotingCommand getTopicMetricsSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ try {
+ byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_TIMER_METRICS);
+ if (body != null) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(body);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("No latest snapshot file found for timer_metrics.");
+ }
+ } catch (Exception e) {
+ LOGGER.error("getTopicMetricsSnapshot error", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
+ return response;
+ }
+
+ private RemotingCommand getDelayOffsetSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ try {
+ byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_DELAY_OFFSET);
+ if (body != null) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(body);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("No latest snapshot file found for delay_offset.");
+ }
+ } catch (Exception e) {
+ LOGGER.error("getDelayOffsetSnapshot error", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
+ return response;
+ }
+
+ private RemotingCommand getTopicConfigSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ try {
+ byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_TOPIC_CONFIG);
+ if (body != null) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(body);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("No latest snapshot file found for topic_config.");
+ }
+ } catch (Exception e) {
+ LOGGER.error("getTopicConfigSnapshot error", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
+ return response;
+ }
+
+ private RemotingCommand getSubscriptionGroupSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ try {
+ byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_SUBSCRIPTION_GROUP);
+ if (body != null) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(body);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("No latest snapshot file found for subscription_group.");
+ }
+ } catch (Exception e) {
+ LOGGER.error("getSubscriptionGroupSnapshot error", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
+ return response;
+ }
+
+ private RemotingCommand getConsumerOffsetSnapshot(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ try {
+ byte[] body = readLatestSnapshotFile(MixAll.SNAPSHOT_NAME_CONSUMER_OFFSET);
+ if (body != null) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(body);
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("No latest snapshot file found for consumer_offset.");
+ }
+ } catch (Exception e) {
+ LOGGER.error("getConsumerOffsetSnapshot error", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
+ return response;
+ }
+
+
+ private byte[] readLatestSnapshotFile(String snapshotName) throws IOException {
+ String snapshotDir = brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "snapshot";
+ Path dirPath = Paths.get(snapshotDir);
+
+ if (Files.notExists(dirPath)) {
+ LOGGER.warn("Snapshot directory does not exist: {}", dirPath);
+ return null;
+ }
+ Path latestFile = Files.list(dirPath)
+ .filter(path -> path.getFileName().toString().startsWith(snapshotName + "_snapshot_"))
+ .max(Comparator.comparing(path -> path.getFileName().toString()))
+ .orElse(null);
+
+ if (latestFile != null) {
+ return Files.readAllBytes(latestFile);
+ } else {
+ LOGGER.warn("No snapshot file found for: {}", snapshotName);
+ return null;
+ }
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index bec75fe..08855b1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -116,13 +116,28 @@
}
private void updateOffset(int delayLevel, long offset) {
- this.offsetTable.put(delayLevel, offset);
+ this.offsetTable.compute(delayLevel, (key, existingConfig) -> {
+ if (existingConfig == null) {
+ notifyDelayOffsetCreated(delayLevel, offset);
+ } else {
+ notifyDelayOffsetUpdated(delayLevel, offset);
+ }
+ return offset;
+ });
if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getDelayOffsetUpdateVersionStep() == 0) {
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
}
}
+ private void notifyDelayOffsetCreated(int delayLevel, long offset) {
+ brokerController.getMetadataChangeObserver().onCreated(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, String.valueOf(delayLevel), offset);
+ }
+
+ private void notifyDelayOffsetUpdated(int delayLevel, long offset) {
+ brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, String.valueOf(delayLevel), offset);
+ }
+
public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
Long time = this.delayLevelTable.get(delayLevel);
if (time != null) {
@@ -268,6 +283,18 @@
return true;
}
+ public ConcurrentMap<Integer, Long> deepCopyDelayOffsetTable() {
+ ConcurrentMap<Integer, Long> newTable = new ConcurrentHashMap<>(this.offsetTable.size());
+ for (Map.Entry<Integer, Long> entry : this.offsetTable.entrySet()) {
+ Integer level = entry.getKey();
+ Long offset = entry.getValue();
+ if (offset != null) {
+ newTable.put(level, offset);
+ }
+ }
+ return newTable;
+ }
+
@Override
public String configFilePath() {
return StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController.getMessageStore().getMessageStoreConfig()
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 2e31340..a371562 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -17,21 +17,37 @@
package org.apache.rocketmq.broker.slave;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.sync.MetadataChangeInfo;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
@@ -39,6 +55,7 @@
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMetrics;
@@ -46,11 +63,335 @@
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private volatile String masterAddr = null;
+ private boolean isIncrementSyncRunning = false;
+
+ private DefaultLitePullConsumer incrementSyncConsumer;
+
+ private long topicConfigSyncOffset = 0;
+ private long consumerOffsetSyncOffset = 0;
+ private long subscriptionGroupSyncOffset = 0;
+ private long delayOffsetSyncOffset = 0;
+ private long messageRequestModeSyncOffset = 0;
+ private long timerMetricsSyncOffset = 0;
+
public SlaveSynchronize(BrokerController brokerController) {
this.brokerController = brokerController;
}
+ public void start() {
+ if (!isIncrementSyncRunning) {
+ try {
+ isIncrementSyncRunning = true;
+ initIncrementSyncConsumer();
+
+ Callable<Map<MessageQueue, Long>> snapshotSyncCallback = () -> {
+ Map<MessageQueue, Long> messageQueueLongMap = null;
+ try {
+ messageQueueLongMap = syncAllMetadataSnapshots();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return messageQueueLongMap;
+ };
+ startIncrementalSync(snapshotSyncCallback);
+
+ LOGGER.info("Slave synchronize service started successfully.");
+ } catch (Exception e) {
+ LOGGER.error("Failed to start slave synchronize service", e);
+ isIncrementSyncRunning = false;
+ }
+ }
+ }
+
+ private void initIncrementSyncConsumer() throws MQClientException {
+ this.incrementSyncConsumer = new DefaultLitePullConsumer(MixAll.SLAVE_INCREMENT_SYNC_CONSUMER_GROUP);
+ this.incrementSyncConsumer.setNamesrvAddr(this.brokerController.getBrokerConfig().getNamesrvAddr());
+ this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC);
+ this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC);
+ this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC);
+ this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC);
+ this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC);
+
+ if (brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
+ this.incrementSyncConsumer.subscribe(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC);
+ }
+ }
+
+ public void startIncrementalSync(Callable<Map<MessageQueue, Long>> syncSnapshotCallback) {
+ try {
+ this.incrementSyncConsumer.start();
+ setConsumerOffset(syncSnapshotCallback.call());
+ while (this.incrementSyncConsumer.isRunning()) {
+ List<MessageExt> messages = this.incrementSyncConsumer.poll(1000 * 3);
+ if (messages != null && !messages.isEmpty()) {
+ for (MessageExt msg : messages) {
+ try {
+ handleMetadataMessage(msg);
+ } catch (Exception e) {
+ LOGGER.error("Failed to handle metadata message", e);
+ }
+ }
+ }
+ if (isLaggingTooFarBehind()) {
+ LOGGER.info("Incremental sync consumer is lagging too far behind");
+ setConsumerOffset(syncSnapshotCallback.call());
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to start incremental sync consumer", e);
+ throw new RuntimeException(e);
+ } finally {
+ this.incrementSyncConsumer.shutdown();
+ }
+ }
+
+ private void setConsumerOffset(Map<MessageQueue, Long> consumerOffsets) throws MQClientException {
+ for (Map.Entry<MessageQueue, Long> entry : consumerOffsets.entrySet()) {
+ MessageQueue queue = entry.getKey();
+ Long offset = entry.getValue();
+ this.incrementSyncConsumer.seek(queue, offset);
+ }
+ }
+
+ private boolean isLaggingTooFarBehind() {
+ final int threshold = brokerController.getBrokerConfig().getIncrementalSyncConsumerLagThreshold();
+ long topicConfigMaxOffset;
+ long consumerOffsetMaxOffset;
+ long subscriptionGroupMaxOffset;
+ long delayOffsetMaxOffset;
+ long messageRequestModeMaxOffset;
+ long timerMetricsMaxOffset;
+ try {
+ topicConfigMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC, 0);
+ consumerOffsetMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC, 0);
+ subscriptionGroupMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, 0);
+ delayOffsetMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, 0);
+ messageRequestModeMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, 0);
+ timerMetricsMaxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, 0);
+ } catch (ConsumeQueueException e) {
+ return true;
+ }
+ return (topicConfigMaxOffset - topicConfigSyncOffset) > threshold ||
+ (consumerOffsetMaxOffset - consumerOffsetSyncOffset) > threshold ||
+ (subscriptionGroupMaxOffset - subscriptionGroupSyncOffset) > threshold ||
+ (delayOffsetMaxOffset - delayOffsetSyncOffset) > threshold
+ || (messageRequestModeMaxOffset - messageRequestModeSyncOffset) > threshold
+ || (timerMetricsMaxOffset - timerMetricsSyncOffset) > threshold;
+ }
+
+ private void handleMetadataMessage(MessageExt msg) {
+ MetadataChangeInfo metadataChangeInfo = JSONObject.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), MetadataChangeInfo.class);
+ switch (msg.getTopic()) {
+ case TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC:
+ handleTopicConfigChangeInfo(metadataChangeInfo);
+ this.topicConfigSyncOffset = msg.getQueueOffset();
+ break;
+ case TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC:
+ handleConsumerOffsetChangeInfo(metadataChangeInfo);
+ this.consumerOffsetSyncOffset = msg.getQueueOffset();
+ break;
+ case TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC:
+ handleSubscriptionGroupChangeInfo(metadataChangeInfo);
+ this.subscriptionGroupSyncOffset = msg.getQueueOffset();
+ break;
+ case TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC:
+ handleDelayOffsetChangeInfo(metadataChangeInfo);
+ this.delayOffsetSyncOffset = msg.getQueueOffset();
+ break;
+ case TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC:
+ handleMessageModeChangeInfo(metadataChangeInfo);
+ this.messageRequestModeSyncOffset = msg.getQueueOffset();
+ break;
+ case TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC:
+ handleTimerMetricsChangeInfo(metadataChangeInfo);
+ this.timerMetricsSyncOffset = msg.getQueueOffset();
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void handleTopicConfigChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+ switch (metadataChangeInfo.getChangeType()) {
+ case CREATED:
+ case UPDATED:
+ this.brokerController.getTopicConfigManager().getTopicConfigTable().put(
+ metadataChangeInfo.getMetadataKey(), JSON.parseObject(metadataChangeInfo.getMetadataValue(), TopicConfig.class));
+ break;
+ case DELETED:
+ this.brokerController.getTopicConfigManager().getTopicConfigTable().remove(metadataChangeInfo.getMetadataKey());
+ break;
+ default:
+ break;
+
+ }
+ }
+
+ private void handleConsumerOffsetChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+ ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsets = JSONObject.parseObject(metadataChangeInfo.getMetadataValue(), new TypeReference<ConcurrentMap<String, ConcurrentMap<Integer, Long>>>() {
+ });
+ this.brokerController.getConsumerOffsetManager().getOffsetTable().putAll(offsets);
+ }
+
+ private void handleSubscriptionGroupChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+ switch (metadataChangeInfo.getChangeType()) {
+ case CREATED:
+ case UPDATED:
+ this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put(
+ metadataChangeInfo.getMetadataKey(), JSON.parseObject(metadataChangeInfo.getMetadataValue(), SubscriptionGroupConfig.class));
+ break;
+ case DELETED:
+ this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().remove(metadataChangeInfo.getMetadataKey());
+ break;
+ default:
+ break;
+ }
+
+ }
+
+ private void handleDelayOffsetChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+ switch (metadataChangeInfo.getChangeType()) {
+ case CREATED:
+ case UPDATED:
+ this.brokerController.getScheduleMessageService().getOffsetTable().put(
+ Integer.valueOf(metadataChangeInfo.getMetadataKey()), Long.parseLong(metadataChangeInfo.getMetadataValue()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void handleMessageModeChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+ switch (metadataChangeInfo.getChangeType()) {
+ case CREATED:
+ case UPDATED:
+ ConcurrentHashMap<String, SetMessageRequestModeRequestBody> requestModeMap = new ConcurrentHashMap<>();
+ LinkedHashMap<String, SetMessageRequestModeRequestBody> linkedMap = JSONObject.parseObject(metadataChangeInfo.getMetadataValue(),
+ new TypeReference<LinkedHashMap<String, SetMessageRequestModeRequestBody>>() {
+ });
+ if (linkedMap != null) {
+ requestModeMap.putAll(linkedMap);
+ }
+ this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().getMessageRequestModeMap().put(
+ metadataChangeInfo.getMetadataKey(), requestModeMap);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void handleTimerMetricsChangeInfo(MetadataChangeInfo metadataChangeInfo) {
+ switch (metadataChangeInfo.getChangeType()) {
+ case CREATED:
+ case UPDATED:
+ this.brokerController.getTimerMessageStore().getTimerMetrics().getTimingCount().put(
+ metadataChangeInfo.getMetadataKey(), JSON.parseObject(metadataChangeInfo.getMetadataValue(), TimerMetrics.Metric.class));
+ break;
+ case DELETED:
+ break;
+ default:
+ break;
+ }
+ }
+
+
+ private Map<MessageQueue, Long> syncAllMetadataSnapshots() throws Exception {
+ LOGGER.info("Starting full metadata synchronization via snapshots...");
+ HashMap<MessageQueue, Long> result = new HashMap<>();
+
+
+ Triple<ConcurrentMap<String, TopicConfig>, DataVersion, Long> topicConfigSnapshot =
+ this.brokerController.getBrokerOuterAPI().getTopicConfigSnapShot(this.masterAddr);
+ if (topicConfigSnapshot != null) {
+ if (this.topicConfigSyncOffset < topicConfigSnapshot.getRight()) {
+ this.brokerController.getTopicConfigManager().setTopicConfigTable(topicConfigSnapshot.getLeft());
+ this.brokerController.getTopicConfigManager().persist();
+ this.topicConfigSyncOffset = topicConfigSnapshot.getRight();
+ result.put(new MessageQueue(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), topicConfigSyncOffset);
+ }
+ LOGGER.info("Topic config synced. Pull will start from offset: {}", consumerOffsetSyncOffset);
+ }
+
+ Triple<ConcurrentMap<String, ConcurrentMap<Integer, Long>>, DataVersion, Long> consumerOffsetSnapshot =
+ this.brokerController.getBrokerOuterAPI().getConsumerOffsetSnapShot(this.masterAddr);
+ if (consumerOffsetSnapshot != null) {
+ if (this.consumerOffsetSyncOffset < consumerOffsetSnapshot.getRight()) {
+ this.brokerController.getConsumerOffsetManager().setOffsetTable(consumerOffsetSnapshot.getLeft());
+ this.brokerController.getConsumerOffsetManager().persist();
+ this.consumerOffsetSyncOffset = consumerOffsetSnapshot.getRight();
+ result.put(new MessageQueue(TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), consumerOffsetSyncOffset);
+ }
+ this.brokerController.getConsumerOffsetManager().setDataVersion(consumerOffsetSnapshot.getMiddle());
+ LOGGER.info("Consumer offset synced. Pull will start from offset: {}", consumerOffsetSyncOffset);
+ }
+
+ Triple<ConcurrentMap<String, SubscriptionGroupConfig>, DataVersion, Long> subscriptionGroupSnapshot =
+ this.brokerController.getBrokerOuterAPI().getSubscriptionGroupSnapShot(this.masterAddr);
+ if (subscriptionGroupSnapshot != null) {
+ this.brokerController.getSubscriptionGroupManager().setDataVersion(subscriptionGroupSnapshot.getMiddle());
+ if (this.subscriptionGroupSyncOffset < subscriptionGroupSnapshot.getRight()) {
+ this.brokerController.getSubscriptionGroupManager().setSubscriptionGroupTable(subscriptionGroupSnapshot.getLeft());
+ this.brokerController.getSubscriptionGroupManager().persist();
+ this.subscriptionGroupSyncOffset = subscriptionGroupSnapshot.getRight();
+ result.put(new MessageQueue(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), subscriptionGroupSyncOffset);
+ }
+ LOGGER.info("Subscription group synced. Pull will start from offset: {}", subscriptionGroupSyncOffset);
+ }
+
+ Triple<ConcurrentMap<String, Long>,DataVersion,Long> delayOffsetSnapshot =
+ this.brokerController.getBrokerOuterAPI().getDelayOffsetSnapShot(this.masterAddr);
+ if (delayOffsetSnapshot != null) {
+ Map<String, Object> jsonOutput = new LinkedHashMap<>();
+ jsonOutput.put("dataVersion", delayOffsetSnapshot.getMiddle());
+ jsonOutput.put("offsetTable", delayOffsetSnapshot.getLeft());
+ String jsonString = JSON.toJSONString(jsonOutput);
+ MixAll.string2File(jsonString, StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()));
+ this.brokerController.getScheduleMessageService().loadWhenSyncDelayOffset();
+ if (this.delayOffsetSyncOffset < delayOffsetSnapshot.getRight()) {
+ this.delayOffsetSyncOffset = delayOffsetSnapshot.getRight();
+ result.put(new MessageQueue(TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), delayOffsetSyncOffset);
+ }
+ LOGGER.info("Delay offset synced. Pull will start from offset: {}", delayOffsetSyncOffset);
+ }
+
+ Pair<ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>>, Long> messageModeSnapshot =
+ this.brokerController.getBrokerOuterAPI().getSetMessageRequestModeSnapShot(this.masterAddr);
+ if (messageModeSnapshot != null) {
+
+ if (this.messageRequestModeSyncOffset < messageModeSnapshot.getObject2()) {
+ this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().setMessageRequestModeMap(messageModeSnapshot.getObject1());
+ this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().persist();
+ this.messageRequestModeSyncOffset = messageModeSnapshot.getObject2();
+ result.put(new MessageQueue(TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), messageRequestModeSyncOffset);
+ }
+ LOGGER.info("Message request mode synced. Pull will start from offset: {}", messageRequestModeSyncOffset);
+ }
+ LOGGER.info("Full metadata synchronization completed.");
+
+ if (brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
+ Triple<ConcurrentMap<String, TimerMetrics.Metric>, DataVersion, Long> timerMetricsSnapshot =
+ this.brokerController.getBrokerOuterAPI().getTopicMetricsSnapShot(this.masterAddr);
+ if (timerMetricsSnapshot != null) {
+ if (null != brokerController.getMessageStore().getTimerMessageStore()) {
+ this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getDataVersion().assignNewOne(timerMetricsSnapshot.getMiddle());
+ this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getTimingCount().clear();
+ this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getTimingCount().putAll(timerMetricsSnapshot.getLeft());
+ this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().persist();
+ if (this.timerMetricsSyncOffset < timerMetricsSnapshot.getRight()) {
+ this.timerMetricsSyncOffset = timerMetricsSnapshot.getRight();
+ result.put(new MessageQueue(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, brokerController.getBrokerConfig().getBrokerName(), 0), timerMetricsSyncOffset);
+ }
+ this.timerMetricsSyncOffset = timerMetricsSnapshot.getRight();
+ }
+ }
+ }
+ return result;
+ }
+
+
+
+
public String getMasterAddr() {
return masterAddr;
}
@@ -63,18 +404,19 @@
}
public void syncAll() {
- this.syncTopicConfig();
- this.syncConsumerOffset();
- this.syncDelayOffset();
- this.syncSubscriptionGroupConfig();
- this.syncMessageRequestMode();
+ this.syncTopicConfigFull();
+ this.syncConsumerOffsetFull();
+ this.syncDelayOffsetFull();
+ this.syncSubscriptionGroupConfigFull();
+ this.syncMessageRequestModeFull();
if (brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
- this.syncTimerMetrics();
+ this.syncTimerMetricsFull();
}
}
- private void syncTopicConfig() {
+
+ private void syncTopicConfigFull() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
@@ -117,14 +459,15 @@
this.brokerController.getTopicQueueMappingManager().persist();
}
- LOGGER.info("Update slave topic config from master, {}", masterAddrBak);
+ LOGGER.info("Full sync slave topic config from master, {}", masterAddrBak);
} catch (Exception e) {
LOGGER.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
}
}
}
- private void syncConsumerOffset() {
+
+ private void syncConsumerOffsetFull() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
@@ -134,14 +477,14 @@
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().getDataVersion().assignNewOne(offsetWrapper.getDataVersion());
this.brokerController.getConsumerOffsetManager().persist();
- LOGGER.info("Update slave consumer offset from master, {}", masterAddrBak);
+ LOGGER.info("Full sync slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
LOGGER.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}
- private void syncDelayOffset() {
+ private void syncDelayOffsetFull() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
@@ -159,14 +502,14 @@
LOGGER.error("Persist file Exception, {}", fileName, e);
}
}
- LOGGER.info("Update slave delay offset from master, {}", masterAddrBak);
+ LOGGER.info("Full sync slave delay offset from master, {}", masterAddrBak);
} catch (Exception e) {
LOGGER.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
}
}
}
- private void syncSubscriptionGroupConfig() {
+ private void syncSubscriptionGroupConfigFull() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
@@ -197,7 +540,7 @@
subscriptionGroupManager.updateDataVersion();
// persist
subscriptionGroupManager.persist();
- LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak);
+ LOGGER.info("Full sync slave Subscription Group from master, {}", masterAddrBak);
}
} catch (Exception e) {
LOGGER.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
@@ -205,7 +548,7 @@
}
}
- private void syncMessageRequestMode() {
+ private void syncMessageRequestModeFull() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
@@ -225,7 +568,7 @@
curMessageRequestModeMap.putAll(newMessageRequestModeMap);
// persist
messageRequestModeManager.persist();
- LOGGER.info("Update slave Message Request Mode from master, {}", masterAddrBak);
+ LOGGER.info("Full sync slave Message Request Mode from master, {}", masterAddrBak);
} catch (Exception e) {
LOGGER.error("SyncMessageRequestMode Exception, {}", masterAddrBak, e);
}
@@ -251,7 +594,7 @@
}
}
- private void syncTimerMetrics() {
+ private void syncTimerMetricsFull() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index e860f29..c9901a7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -77,63 +77,82 @@
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
- putSubscriptionGroupConfig(subscriptionGroupConfig);
+ this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
- putSubscriptionGroupConfig(subscriptionGroupConfig);
+ this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
- putSubscriptionGroupConfig(subscriptionGroupConfig);
+ this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
- putSubscriptionGroupConfig(subscriptionGroupConfig);
+ this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
- putSubscriptionGroupConfig(subscriptionGroupConfig);
+ this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
- putSubscriptionGroupConfig(subscriptionGroupConfig);
+ this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
- putSubscriptionGroupConfig(subscriptionGroupConfig);
+ this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_SYS_RMQ_TRANS);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
- putSubscriptionGroupConfig(subscriptionGroupConfig);
+ this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
}
}
public SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
- return this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
+ return this.subscriptionGroupTable.compute(subscriptionGroupConfig.getGroupName(), (key, existingConfig) -> {
+ if (existingConfig == null) {
+ notifySubscriptionGroupCreated(subscriptionGroupConfig);
+ } else {
+ notifySubscriptionGroupUpdated(subscriptionGroupConfig);
+ }
+ return subscriptionGroupConfig;
+ });
}
protected SubscriptionGroupConfig putSubscriptionGroupConfigIfAbsent(SubscriptionGroupConfig subscriptionGroupConfig) {
- return this.subscriptionGroupTable.putIfAbsent(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig);
+ final SubscriptionGroupConfig[] result = new SubscriptionGroupConfig[1];
+ this.subscriptionGroupTable.compute(subscriptionGroupConfig.getGroupName(), (key, existingConfig) -> {
+ if (existingConfig == null) {
+ notifySubscriptionGroupCreated(subscriptionGroupConfig);
+ result[0] = null;
+ return subscriptionGroupConfig;
+ } else {
+ notifySubscriptionGroupUpdated(subscriptionGroupConfig);
+ result[0] = existingConfig;
+ return existingConfig;
+ }
+ });
+ return result[0];
}
protected SubscriptionGroupConfig getSubscriptionGroupConfig(String groupName) {
@@ -141,13 +160,52 @@
}
protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) {
- return this.subscriptionGroupTable.remove(groupName);
+ SubscriptionGroupConfig[] removedConfig = new SubscriptionGroupConfig[1];
+ this.subscriptionGroupTable.compute(groupName, (key, existingConfig) -> {
+ if (existingConfig != null) {
+ removedConfig[0] = existingConfig;
+ notifySubscriptionGroupDeleted(removedConfig[0]);
+ }
+ return null;
+ });
+ return removedConfig[0];
+ }
+
+ private void notifySubscriptionGroupCreated(SubscriptionGroupConfig config) {
+ brokerController.getMetadataChangeObserver().onCreated(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, config.getGroupName(), config);
+ }
+
+ private void notifySubscriptionGroupUpdated(SubscriptionGroupConfig config) {
+ brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, config.getGroupName(), config);
+ }
+
+ private void notifySubscriptionGroupDeleted(SubscriptionGroupConfig config) {
+ brokerController.getMetadataChangeObserver().onDeleted(TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC, config.getGroupName(), config);
}
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
updateSubscriptionGroupConfigWithoutPersist(config);
this.persist();
}
+ public ConcurrentMap<String, SubscriptionGroupConfig> deepCopySubscriptionGroupTable() {
+ ConcurrentMap<String, SubscriptionGroupConfig> newTable =
+ new ConcurrentHashMap<>(this.subscriptionGroupTable.size());
+
+ for (Map.Entry<String, SubscriptionGroupConfig> entry : this.subscriptionGroupTable.entrySet()) {
+ String groupName = entry.getKey();
+ SubscriptionGroupConfig originalConfig = entry.getValue();
+
+ if (originalConfig != null) {
+ try {
+ SubscriptionGroupConfig clonedConfig = originalConfig.clone();
+ newTable.put(groupName, clonedConfig);
+ } catch (CloneNotSupportedException e) {
+ newTable.put(groupName, originalConfig);
+ }
+ }
+ }
+ return newTable;
+ }
public void updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
Map<String, String> newAttributes = request(config);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMessageProducer.java b/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMessageProducer.java
new file mode 100644
index 0000000..05fd8c5
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMessageProducer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.sync;
+
+import com.alibaba.fastjson2.JSON;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.sync.MetadataChangeInfo;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+
+public class SyncMessageProducer {
+ protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+ private final int syncQueueId;
+
+ public SyncMessageProducer(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ this.syncQueueId = 0;
+ }
+
+ public void sendMetadataChange(final String targetTopic, MetadataChangeInfo changeInfo) {
+ try {
+ byte[] body = JSON.toJSONBytes(changeInfo);
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setBrokerName(brokerController.getBrokerConfig().getBrokerName());
+ msg.setTopic(targetTopic);
+ msg.setBody(body);
+ msg.setKeys(changeInfo.getMetadataKey());
+ msg.setTags(changeInfo.getMetadataKey());
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(brokerController.getStoreHost());
+ msg.setBornHost(brokerController.getStoreHost());
+ msg.setQueueId(this.syncQueueId);
+ msg.setMsgId(MessageClientIDSetter.createUniqID());
+ PutMessageResult result = this.brokerController.getMessageStore().putMessage(msg);
+ if (result.getPutMessageStatus() != PutMessageStatus.PUT_OK) {
+ throw new RuntimeException("send metadata change failed, topic: " + targetTopic + ", msgId: " + msg.getMsgId() + ", status: " + result.getPutMessageStatus().name());
+ }
+ } catch (Exception e) {
+ LOG.error("send metadata change failed, topic: " + targetTopic, e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMetadataChangeObserver.java b/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMetadataChangeObserver.java
new file mode 100644
index 0000000..9b5501e
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/sync/SyncMetadataChangeObserver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.sync;
+
+import com.alibaba.fastjson2.JSON;
+import org.apache.rocketmq.common.sync.MetadataChangeInfo;
+import org.apache.rocketmq.common.sync.MetadataChangeObserver;
+
+public class SyncMetadataChangeObserver implements MetadataChangeObserver {
+
+
+ private final SyncMessageProducer producer;
+
+ public SyncMetadataChangeObserver(SyncMessageProducer producer) {
+ this.producer = producer;
+ }
+
+ @Override
+ public void onCreated(String targetTopic,String metadataKey, Object newMetadata) {
+ this.producer.sendMetadataChange(targetTopic, MetadataChangeInfo.created(
+ metadataKey,
+ JSON.toJSONString(newMetadata)
+ ));
+ }
+
+ @Override
+ public void onUpdated(String targetTopic, String metadataKey, Object newMetadata) {
+ MetadataChangeInfo changeInfo = MetadataChangeInfo.updated(
+ metadataKey,
+ JSON.toJSONString(newMetadata)
+ );
+ this.producer.sendMetadataChange(targetTopic, changeInfo);
+ }
+
+ @Override
+ public void onDeleted(String targetTopic,String metadataKey, Object oldMetadata) {
+ this.producer.sendMetadataChange(targetTopic, MetadataChangeInfo.deleted(JSON.toJSONString(oldMetadata)));
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index ed46dfd..5841ecb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -92,7 +92,7 @@
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
@@ -105,7 +105,7 @@
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
{
@@ -114,7 +114,7 @@
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1024);
topicConfig.setWriteQueueNums(1024);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
@@ -125,7 +125,7 @@
perm |= PermName.PERM_READ | PermName.PERM_WRITE;
}
topicConfig.setPerm(perm);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
@@ -139,7 +139,7 @@
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
topicConfig.setPerm(perm);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT;
@@ -147,7 +147,7 @@
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
@@ -155,7 +155,7 @@
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
@@ -164,7 +164,7 @@
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
{
@@ -173,7 +173,7 @@
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// PopAckConstants.REVIVE_TOPIC
@@ -182,7 +182,7 @@
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getReviveQueueNum());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getReviveQueueNum());
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// sync broker member group topic
@@ -192,7 +192,7 @@
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
topicConfig.setPerm(PermName.PERM_INHERIT);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC
@@ -201,7 +201,7 @@
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
@@ -211,9 +211,62 @@
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
- putTopicConfig(topicConfig);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
-
+ {
+ // TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC
+ String topic = TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ TopicValidator.addSystemTopic(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+ // TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC
+ String topic = TopicValidator.RMQ_SYS_CONSUMER_OFFSET_SYNC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ TopicValidator.addSystemTopic(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+ // TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC
+ String topic = TopicValidator.RMQ_SYS_DELAY_OFFSET_SYNC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ TopicValidator.addSystemTopic(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+ // TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC
+ String topic = TopicValidator.RMQ_SYS_SUBSCRIPTION_GROUP_SYNC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ TopicValidator.addSystemTopic(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+ // TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC
+ String topic = TopicValidator.RMQ_SYS_MESSAGE_MODE_SYNC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ TopicValidator.addSystemTopic(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+ {
+ // TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC
+ String topic = TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ TopicValidator.addSystemTopic(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
{
if (this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
String topic = TimerMessageStore.TIMER_TOPIC;
@@ -227,15 +280,43 @@
}
public TopicConfig putTopicConfig(TopicConfig topicConfig) {
- return this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ return this.topicConfigTable.compute(topicConfig.getTopicName(), (key, existingConfig) -> {
+ if (existingConfig == null) {
+ notifyTopicCreated(topicConfig);
+ } else {
+ notifyTopicUpdated(topicConfig);
+ }
+ return topicConfig;
+ });
}
+ private void notifyTopicCreated(TopicConfig topicConfig) {
+ brokerController.getMetadataChangeObserver().onCreated(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC,topicConfig.getTopicName(), topicConfig);
+ }
+
+ private void notifyTopicUpdated(TopicConfig topicConfig) {
+ brokerController.getMetadataChangeObserver().onUpdated(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC,topicConfig.getTopicName(), topicConfig);
+ }
+
+ private void notifyTopicDeleted(TopicConfig topicConfig) {
+ brokerController.getMetadataChangeObserver().onDeleted(TopicValidator.RMQ_SYS_TOPIC_CONFIG_SYNC,topicConfig.getTopicName(), topicConfig);
+ }
+
+
protected TopicConfig getTopicConfig(String topicName) {
return this.topicConfigTable.get(topicName);
}
protected TopicConfig removeTopicConfig(String topicName) {
- return this.topicConfigTable.remove(topicName);
+ TopicConfig[] topicConfig = new TopicConfig[1];
+ this.topicConfigTable.compute(topicName, (key, existingConfig) -> {
+ if (existingConfig != null) {
+ topicConfig[0] = existingConfig;
+ notifyTopicDeleted(existingConfig);
+ }
+ return null;
+ });
+ return topicConfig[0];
}
public TopicConfig selectTopicConfig(final String topic) {
@@ -759,5 +840,21 @@
}
-
+ public ConcurrentMap<String, TopicConfig> deepCopyTopicConfigTable() {
+ ConcurrentMap<String, TopicConfig> newTable =
+ new ConcurrentHashMap<>(this.topicConfigTable.size());
+ for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
+ String topicName = entry.getKey();
+ TopicConfig originalConfig = entry.getValue();
+ if (originalConfig != null) {
+ try {
+ TopicConfig clonedConfig = originalConfig.clone();
+ newTable.put(topicName, clonedConfig);
+ } catch (CloneNotSupportedException e) {
+ newTable.put(topicName, originalConfig);
+ }
+ }
+ }
+ return newTable;
+ }
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
index 4ff8a81..364d5eb 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
@@ -52,6 +53,9 @@
@Mock
private MessageStore messageStore;
+ @Mock
+ private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
@Rule
public TemporaryFolder tf = new TemporaryFolder();
@@ -67,7 +71,7 @@
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setAutoCreateSubscriptionGroup(false);
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
-
+ Mockito.doReturn(syncMetadataChangeObserver).when(controller).getMetadataChangeObserver();
Mockito.doReturn(messageStore).when(controller).getMessageStore();
Mockito.doReturn(1L).when(messageStore).getStateMachineVersion();
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
index 731a1f5..d07f34b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.store.MessageStore;
@@ -52,6 +53,9 @@
@Rule
public TemporaryFolder tf = new TemporaryFolder();
+ @Mock
+ private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
@After
public void cleanUp() {
if (null != configStorage) {
@@ -67,7 +71,7 @@
messageStoreConfig = new MessageStoreConfig();
Mockito.doReturn(messageStoreConfig).when(controller).getMessageStoreConfig();
Mockito.doReturn(messageStore).when(controller).getMessageStore();
-
+ Mockito.doReturn(syncMetadataChangeObserver).when(controller).getMetadataChangeObserver();
File configStoreDir = tf.newFolder();
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
index 4fbec13..c89f2b9 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -19,6 +19,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.DataVersion;
@@ -59,6 +60,9 @@
@Mock
private DefaultMessageStore defaultMessageStore;
+ @Mock
+ private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
@Before
public void init() {
if (notToBeExecuted()) {
@@ -66,6 +70,7 @@
}
BrokerConfig brokerConfig = new BrokerConfig();
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+ Mockito.doReturn(syncMetadataChangeObserver).when(brokerController).getMetadataChangeObserver();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setStorePathRootDir(basePath);
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
index 3c975a5..a0a3467 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
@@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SubscriptionGroupAttributes;
import org.apache.rocketmq.common.attribute.BooleanAttribute;
@@ -60,6 +61,9 @@
private BrokerController brokerControllerMock;
private SubscriptionGroupManager subscriptionGroupManager;
+ @Mock
+ private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
@Before
public void before() {
SubscriptionGroupAttributes.ALL.put("test", new BooleanAttribute(
@@ -71,6 +75,9 @@
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setStorePathRootDir(basePath);
Mockito.lenient().when(brokerControllerMock.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+ if (!notToBeExecuted()) {
+ Mockito.doReturn(syncMetadataChangeObserver).when(brokerControllerMock).getMetadataChangeObserver();
+ }
}
@After
@@ -238,4 +245,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
index fa3ef95..598f871 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
@@ -34,6 +34,7 @@
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.EnumAttribute;
import org.apache.rocketmq.common.attribute.LongRangeAttribute;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -61,6 +62,9 @@
private BrokerController brokerController;
@Mock
+ private NoopMetadataChangeObserver noopMetadataChangeObserver;
+
+ @Mock
private DefaultMessageStore defaultMessageStore;
@Before
@@ -76,6 +80,7 @@
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
Mockito.lenient().when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
topicConfigManager = new RocksDBTopicConfigManager(brokerController);
+ Mockito.doReturn(noopMetadataChangeObserver).when(brokerController).getMetadataChangeObserver();
topicConfigManager.load();
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
index e925ed4..2fe205d 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
@@ -19,6 +19,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -60,6 +61,9 @@
@Mock
private DefaultMessageStore defaultMessageStore;
+ @Mock
+ private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
@Before
public void init() {
if (notToBeExecuted()) {
@@ -72,6 +76,7 @@
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
+ Mockito.doReturn(syncMetadataChangeObserver).when(brokerController).getMetadataChangeObserver();
}
@After
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index 5b2ea0b..7ebfec1 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
@@ -64,6 +65,9 @@
@Mock
private DefaultMessageStore defaultMessageStore;
+ @Mock
+ private NoopMetadataChangeObserver syncMetadataChangeObserver;
+
@Before
public void init() {
BrokerConfig brokerConfig = new BrokerConfig();
@@ -74,6 +78,7 @@
Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L);
topicConfigManager = new TopicConfigManager(brokerController);
+ Mockito.doReturn(syncMetadataChangeObserver).when(brokerController).getMetadataChangeObserver();
}
@Test
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 04828da..6d83154 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -411,7 +411,7 @@
private boolean usePIDColdCtrStrategy = true;
private long cgColdReadThreshold = 3 * 1024 * 1024;
private long globalColdReadThreshold = 100 * 1024 * 1024;
-
+
/**
* The interval to fetch namesrv addr, default value is 10 second
*/
@@ -479,6 +479,38 @@
private boolean enableCreateSysGroup = true;
+ private boolean allowMetadataIncrementalSync = false;
+
+ private int snapshotIntervalSeconds = 600;
+
+ private int metadataIncrementalSyncThreadPoolNums = 1;
+
+ private int incrementalSyncConsumerLagThreshold = 50;
+
+ public int getMetadataIncrementalSyncThreadPoolNums() {
+ return metadataIncrementalSyncThreadPoolNums;
+ }
+
+ public void setMetadataIncrementalSyncThreadPoolNums(int metadataIncrementalSyncThreadPoolNums) {
+ this.metadataIncrementalSyncThreadPoolNums = metadataIncrementalSyncThreadPoolNums;
+ }
+
+ public int getSnapshotIntervalSeconds() {
+ return snapshotIntervalSeconds;
+ }
+
+ public void setSnapshotIntervalSeconds(int snapshotIntervalSeconds) {
+ this.snapshotIntervalSeconds = snapshotIntervalSeconds;
+ }
+
+ public boolean isAllowMetadataIncrementalSync() {
+ return allowMetadataIncrementalSync;
+ }
+
+ public void setAllowMetadataIncrementalSync(boolean allowMetadataIncrementalSync) {
+ this.allowMetadataIncrementalSync = allowMetadataIncrementalSync;
+ }
+
public String getConfigBlackList() {
return configBlackList;
}
@@ -1963,11 +1995,11 @@
public void setUseStaticSubscription(boolean useStaticSubscription) {
this.useStaticSubscription = useStaticSubscription;
}
-
+
public long getFetchNamesrvAddrInterval() {
return fetchNamesrvAddrInterval;
}
-
+
public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) {
this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
}
@@ -2163,4 +2195,12 @@
public void setSplitMetadataSize(int splitMetadataSize) {
this.splitMetadataSize = splitMetadataSize;
}
+
+ public int getIncrementalSyncConsumerLagThreshold() {
+ return incrementalSyncConsumerLagThreshold;
+ }
+
+ public void setIncrementalSyncConsumerLagThreshold(int incrementalSyncConsumerLagThreshold) {
+ this.incrementalSyncConsumerLagThreshold = incrementalSyncConsumerLagThreshold;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 00006ac..5f37fec 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -120,13 +120,27 @@
public static final String ZONE_MODE = "__ZONE_MODE";
public final static String RPC_REQUEST_HEADER_NAMESPACED_FIELD = "nsd";
public final static String RPC_REQUEST_HEADER_NAMESPACE_FIELD = "ns";
+ public static final String SLAVE_INCREMENT_SYNC_CONSUMER_GROUP = "slave_sync_consumer_group";
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
public static final String METADATA_SCOPE_GLOBAL = "__global__";
public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__syslo__none__";
public static final String MULTI_PATH_SPLITTER = System.getProperty("rocketmq.broker.multiPathSplitter", ",");
-
+ public static final String SNAPSHOT_NAME_TOPIC_CONFIG = "topic_config";
+ public static final String SNAPSHOT_NAME_SUBSCRIPTION_GROUP = "subscription_group";
+ public static final String SNAPSHOT_NAME_CONSUMER_OFFSET = "consumer_offset";
+ public static final String SNAPSHOT_NAME_DELAY_OFFSET = "delay_offset";
+ public static final String SNAPSHOT_NAME_MESSAGE_MODE = "message_mode";
+ public static final String SNAPSHOT_NAME_TIMER_METRICS = "timer_metrics";
+ public static final Set<String> SNAPSHOT_NAMES = ImmutableSet.of(
+ SNAPSHOT_NAME_TOPIC_CONFIG,
+ SNAPSHOT_NAME_SUBSCRIPTION_GROUP,
+ SNAPSHOT_NAME_CONSUMER_OFFSET,
+ SNAPSHOT_NAME_DELAY_OFFSET,
+ SNAPSHOT_NAME_MESSAGE_MODE,
+ SNAPSHOT_NAME_TIMER_METRICS
+ );
private static final String OS = System.getProperty("os.name").toLowerCase();
private static final Set<String> PREDEFINE_GROUP_SET = ImmutableSet.of(
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
index 0bf6490..586a756 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -27,7 +27,7 @@
import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE;
-public class TopicConfig {
+public class TopicConfig implements Cloneable {
private static final String SEPARATOR = " ";
public static int defaultReadQueueNums = 16;
public static int defaultWriteQueueNums = 16;
@@ -270,4 +270,13 @@
+ ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" + order
+ ", attributes=" + attributes + "]";
}
+
+ @Override
+ public TopicConfig clone() throws CloneNotSupportedException {
+ TopicConfig clone = (TopicConfig) super.clone();
+ if (this.attributes != null) {
+ clone.setAttributes(new HashMap<>(this.attributes));
+ }
+ return clone;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeInfo.java b/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeInfo.java
new file mode 100644
index 0000000..0c56039
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sync;
+
+public class MetadataChangeInfo {
+
+ private String metadataKey;
+
+ private String metadataValue;
+
+ private long timestamp;
+
+ private ChangeType changeType;
+
+
+ public enum ChangeType {
+ CREATED,
+ UPDATED,
+ DELETED
+ }
+
+ public static MetadataChangeInfo created(String metadataKey, String metadataValue) {
+ if (metadataKey == null || metadataValue == null) {
+ throw new IllegalArgumentException("Metadata key and value cannot be null");
+ }
+ MetadataChangeInfo metadataChangeInfo = new MetadataChangeInfo();
+ metadataChangeInfo.setMetadataKey(metadataKey);
+ metadataChangeInfo.setMetadataValue(metadataValue);
+ metadataChangeInfo.setChangeType(ChangeType.CREATED);
+ metadataChangeInfo.setTimestamp(System.currentTimeMillis());
+ return metadataChangeInfo;
+ }
+
+
+ public static MetadataChangeInfo updated(String metadataKey, String metadataValue) {
+ if (metadataKey == null || metadataValue == null) {
+ throw new IllegalArgumentException("Metadata key and value cannot be null");
+ }
+ MetadataChangeInfo metadataChangeInfo = new MetadataChangeInfo();
+ metadataChangeInfo.setMetadataKey(metadataKey);
+ metadataChangeInfo.setMetadataValue(metadataValue);
+ metadataChangeInfo.setChangeType(ChangeType.UPDATED);
+ metadataChangeInfo.setTimestamp(System.currentTimeMillis());
+ return metadataChangeInfo;
+ }
+
+ public static MetadataChangeInfo deleted(String metadataKey) {
+ if (metadataKey == null) {
+ throw new IllegalArgumentException("Metadata key cannot be null");
+ }
+ MetadataChangeInfo metadataChangeInfo = new MetadataChangeInfo();
+ metadataChangeInfo.setMetadataKey(metadataKey);
+ metadataChangeInfo.setChangeType(ChangeType.DELETED);
+ metadataChangeInfo.setTimestamp(System.currentTimeMillis());
+ return metadataChangeInfo;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public ChangeType getChangeType() {
+ return changeType;
+ }
+
+ public void setChangeType(ChangeType changeType) {
+ this.changeType = changeType;
+ }
+
+ public String getMetadataKey() {
+ return metadataKey;
+ }
+
+ public void setMetadataKey(String metadataKey) {
+ this.metadataKey = metadataKey;
+ }
+
+ public String getMetadataValue() {
+ return metadataValue;
+ }
+
+ public void setMetadataValue(String metadataValue) {
+ this.metadataValue = metadataValue;
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeObserver.java b/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeObserver.java
new file mode 100644
index 0000000..a1ddd62
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sync/MetadataChangeObserver.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sync;
+
+public interface MetadataChangeObserver {
+
+ void onCreated(String targetTopic,String metadataKey, Object newMetadata);
+
+ void onUpdated(String targetTopic,String metadataKey, Object newMetadata);
+
+ void onDeleted(String targetTopic,String metadataKey, Object oldMetadata);
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/sync/NoopMetadataChangeObserver.java b/common/src/main/java/org/apache/rocketmq/common/sync/NoopMetadataChangeObserver.java
new file mode 100644
index 0000000..4240b10
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sync/NoopMetadataChangeObserver.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sync;
+
+public class NoopMetadataChangeObserver implements MetadataChangeObserver {
+
+
+ @Override
+ public void onCreated(String targetTopic, String metadataKey, Object newMetadata) {
+
+ }
+
+ @Override
+ public void onUpdated(String targetTopic, String metadataKey, Object newMetadata) {
+
+ }
+
+ @Override
+ public void onDeleted(String targetTopic, String metadataKey, Object oldMetadata) {
+
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
index 47d45c6..05d77ee 100644
--- a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
@@ -33,7 +33,12 @@
public static final String RMQ_SYS_SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
public static final String RMQ_SYS_OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
public static final String RMQ_SYS_ROCKSDB_OFFSET_TOPIC = "CHECKPOINT_TOPIC";
-
+ public static final String RMQ_SYS_TOPIC_CONFIG_SYNC = "RMQ_SYS_TOPIC_CONFIG_SYNC";
+ public static final String RMQ_SYS_CONSUMER_OFFSET_SYNC = "RMQ_SYS_CONSUMER_OFFSET_SYNC";
+ public static final String RMQ_SYS_DELAY_OFFSET_SYNC = "RMQ_SYS_DELAY_OFFSET_SYNC";
+ public static final String RMQ_SYS_SUBSCRIPTION_GROUP_SYNC = "RMQ_SYS_SUBSCRIPTION_GROUP_SYNC";
+ public static final String RMQ_SYS_MESSAGE_MODE_SYNC = "RMQ_SYS_MESSAGE_MODE_SYNC";
+ public static final String RMQ_SYS_TIMER_METRICS_SYNC = "RMQ_SYS_TIMER_METRICS_SYNC";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
public static final String SYNC_BROKER_MEMBER_GROUP_PREFIX = SYSTEM_TOPIC_PREFIX + "SYNC_BROKER_MEMBER_";
@@ -64,6 +69,13 @@
SYSTEM_TOPIC_SET.add(RMQ_SYS_SELF_TEST_TOPIC);
SYSTEM_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT);
SYSTEM_TOPIC_SET.add(RMQ_SYS_ROCKSDB_OFFSET_TOPIC);
+ SYSTEM_TOPIC_SET.add(RMQ_SYS_TOPIC_CONFIG_SYNC);
+ SYSTEM_TOPIC_SET.add(RMQ_SYS_CONSUMER_OFFSET_SYNC);
+ SYSTEM_TOPIC_SET.add(RMQ_SYS_DELAY_OFFSET_SYNC);
+ SYSTEM_TOPIC_SET.add(RMQ_SYS_SUBSCRIPTION_GROUP_SYNC);
+ SYSTEM_TOPIC_SET.add(RMQ_SYS_MESSAGE_MODE_SYNC);
+ SYSTEM_TOPIC_SET.add(RMQ_SYS_TIMER_METRICS_SYNC);
+
NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC);
NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TRANS_HALF_TOPIC);
@@ -71,6 +83,13 @@
NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);
NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SELF_TEST_TOPIC);
NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT);
+ NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TOPIC_CONFIG_SYNC);
+ NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_CONSUMER_OFFSET_SYNC);
+ NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_DELAY_OFFSET_SYNC);
+ NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SUBSCRIPTION_GROUP_SYNC);
+ NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_MESSAGE_MODE_SYNC);
+ NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_TIMER_METRICS_SYNC);
+
// regex: ^[%|a-zA-Z0-9_-]+$
// %
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 8b2749e..6cf0b17 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -202,9 +202,9 @@
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
public static final int ADD_WRITE_PERM_OF_BROKER = 327;
-
+
public static final int GET_ALL_PRODUCER_INFO = 328;
-
+
public static final int DELETE_EXPIRED_COMMITLOG = 329;
public static final int GET_TOPIC_CONFIG = 351;
@@ -239,11 +239,14 @@
public static final int RESET_MASTER_FLUSH_OFFSET = 908;
- /**
- * Controller code
- */
- public static final int CONTROLLER_ALTER_SYNC_STATE_SET = 1001;
+ public static final int GET_CONSUMER_OFFSET_SNAPSHOT = 614;
+ public static final int GET_SUBSCRIPTION_GROUP_SNAPSHOT = 615;
+ public static final int GET_TOPIC_CONFIG_SNAPSHOT = 616;
+ public static final int GET_DELAY_OFFSET_SNAPSHOT = 617;
+ public static final int GET_TOPIC_METRICS_SNAPSHOT = 618;
+ public static final int GET_MESSAGE_REQUEST_MODE_SNAPSHOT = 619;
+ public static final int CONTROLLER_ALTER_SYNC_STATE_SET = 1001;
public static final int CONTROLLER_ELECT_MASTER = 1002;
public static final int CONTROLLER_REGISTER_BROKER = 1003;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SetMessageRequestModeRequestBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SetMessageRequestModeRequestBody.java
index 31aecd0..b1991ac 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SetMessageRequestModeRequestBody.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SetMessageRequestModeRequestBody.java
@@ -20,7 +20,7 @@
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-public class SetMessageRequestModeRequestBody extends RemotingSerializable {
+public class SetMessageRequestModeRequestBody extends RemotingSerializable implements Cloneable {
private String topic;
@@ -67,4 +67,9 @@
public void setPopShareQueueNum(int popShareQueueNum) {
this.popShareQueueNum = popShareQueueNum;
}
+
+ @Override
+ public SetMessageRequestModeRequestBody clone() throws CloneNotSupportedException {
+ return (SetMessageRequestModeRequestBody)super.clone();
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
index c9c2a80..6e34bfa 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
@@ -19,12 +19,13 @@
import com.google.common.base.MoreObjects;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.rocketmq.common.MixAll;
-public class SubscriptionGroupConfig {
+public class SubscriptionGroupConfig implements Cloneable {
private String groupName;
@@ -222,6 +223,18 @@
}
@Override
+ public SubscriptionGroupConfig clone() throws CloneNotSupportedException {
+ SubscriptionGroupConfig clone = (SubscriptionGroupConfig) super.clone();
+ if (subscriptionDataSet != null) {
+ clone.setSubscriptionDataSet(new HashSet<>(subscriptionDataSet));
+ }
+ if (attributes != null) {
+ clone.setAttributes(new HashMap<>(attributes));
+ }
+ return clone;
+ }
+
+ @Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("groupName", groupName)
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
index ba72404..fc4f5a0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
@@ -41,6 +41,7 @@
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.sync.MetadataChangeObserver;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -52,8 +53,11 @@
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lock = new ReentrantLock();
+ private transient MetadataChangeObserver changeNotifier;
+
private final ConcurrentMap<String, Metric> timingCount = new ConcurrentHashMap<>(1024);
+
private final ConcurrentMap<Integer, Metric> timingDistribution = new ConcurrentHashMap<>(1024);
@SuppressWarnings("DoubleBraceInitialization")
@@ -75,6 +79,11 @@
this.configPath = configPath;
}
+ public TimerMetrics(String configPath, MetadataChangeObserver changeNotifier) {
+ this.configPath = configPath;
+ this.changeNotifier = changeNotifier;
+ }
+
public long updateDistPair(int period, int value) {
Metric distPair = getDistPair(period);
return distPair.getCount().addAndGet(value);
@@ -107,9 +116,22 @@
return pair;
}
pair = new Metric();
- final Metric previous = timingCount.putIfAbsent(topic, pair);
- if (null != previous) {
- return previous;
+ final Metric[] previous = new Metric[1];
+ Metric finalPair = pair;
+ timingCount.compute(topic, (key, existingValue) -> {
+ if (null != existingValue) {
+ changeNotifier.onUpdated(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, topic, finalPair);
+ previous[0] = existingValue;
+ return existingValue;
+ } else {
+ changeNotifier.onCreated(TopicValidator.RMQ_SYS_TIMER_METRICS_SYNC, topic, finalPair);
+ previous[0] = null;
+ return finalPair;
+ }
+
+ });
+ if (null != previous[0]) {
+ return previous[0];
}
return pair;
}
@@ -287,4 +309,20 @@
}
}
+ public ConcurrentMap<String, TimerMetrics.Metric> deepCopyTimingCountSnapshot() {
+ ConcurrentMap<String, TimerMetrics.Metric> snapshot = new ConcurrentHashMap<>(this.timingCount.size());
+ for (Map.Entry<String, TimerMetrics.Metric> entry : this.timingCount.entrySet()) {
+ String topic = entry.getKey();
+ TimerMetrics.Metric originalMetric = entry.getValue();
+
+ if (originalMetric != null) {
+ TimerMetrics.Metric clonedMetric = new TimerMetrics.Metric();
+ clonedMetric.setCount(new AtomicLong(originalMetric.getCount().get()));
+ clonedMetric.setTimeStamp(originalMetric.getTimeStamp());
+ snapshot.put(topic, clonedMetric);
+ }
+ }
+
+ return snapshot;
+ }
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index a014e77..9586a08 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -43,6 +43,7 @@
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -122,9 +123,9 @@
if (null == rootDir) {
rootDir = StoreTestUtils.createBaseDir();
}
-
+ NoopMetadataChangeObserver noopMetadataChangeObserver = new NoopMetadataChangeObserver();
TimerCheckpoint timerCheckpoint = new TimerCheckpoint(rootDir + File.separator + "config" + File.separator + "timercheck");
- TimerMetrics timerMetrics = new TimerMetrics(rootDir + File.separator + "config" + File.separator + "timermetrics");
+ TimerMetrics timerMetrics = new TimerMetrics(rootDir + File.separator + "config" + File.separator + "timermetrics", noopMetadataChangeObserver);
MessageStore ms = needMock ? mockMessageStore : messageStore;
TimerMessageStore timerMessageStore = new TimerMessageStore(ms, storeConfig, timerCheckpoint, timerMetrics, null);
ms.setTimerMessageStore(timerMessageStore);
diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
index f664e67..489498e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
@@ -19,20 +19,25 @@
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.sync.MetadataChangeObserver;
+import org.apache.rocketmq.common.sync.NoopMetadataChangeObserver;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.List;
-
+@ExtendWith(MockitoExtension.class)
public class TimerMetricsTest {
+ private final MetadataChangeObserver syncMetadataChangeObserver = new NoopMetadataChangeObserver();
@Test
public void testTimingCount() {
String baseDir = StoreTestUtils.createBaseDir();
- TimerMetrics first = new TimerMetrics(baseDir);
+ TimerMetrics first = new TimerMetrics(baseDir,syncMetadataChangeObserver);
Assert.assertTrue(first.load());
MessageExt msg = new MessageExt();
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, "AAA");
@@ -46,7 +51,7 @@
Assert.assertTrue(first.getTopicPair("AAA").getTimeStamp() <= curr);
first.persist();
- TimerMetrics second = new TimerMetrics(baseDir);
+ TimerMetrics second = new TimerMetrics(baseDir,syncMetadataChangeObserver);
Assert.assertTrue(second.load());
Assert.assertEquals(1000, second.getTimingCount("AAA"));
Assert.assertEquals(2000, second.getTimingCount("BBB"));