[ISSUE #9773] Implement Shared RocksDB Instance for Broker Configs (#9774)
diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
index dcf9061..04e745e 100644
--- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
+++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
@@ -35,17 +35,20 @@
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
-import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksDB;
public class LocalAuthenticationMetadataProvider implements AuthenticationMetadataProvider {
+ private final static String AUTH_METADATA_COLUMN_FAMILY = new String(RocksDB.DEFAULT_COLUMN_FAMILY,
+ StandardCharsets.UTF_8);
+
private ConfigRocksDBStorage storage;
private LoadingCache<String, User> userCache;
@Override
public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
- this.storage = new ConfigRocksDBStorage(authConfig.getAuthConfigPath() + File.separator + "users");
+ this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "users", false);
if (!this.storage.start()) {
throw new RuntimeException("Failed to load rocksdb for auth_user, please check whether it is occupied");
}
@@ -72,7 +75,7 @@
try {
byte[] keyBytes = user.getUsername().getBytes(StandardCharsets.UTF_8);
byte[] valueBytes = JSON.toJSONBytes(user);
- this.storage.put(keyBytes, keyBytes.length, valueBytes);
+ this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
this.storage.flushWAL();
this.userCache.invalidate(user.getUsername());
} catch (Exception e) {
@@ -84,7 +87,7 @@
@Override
public CompletableFuture<Void> deleteUser(String username) {
try {
- this.storage.delete(username.getBytes(StandardCharsets.UTF_8));
+ this.storage.delete(AUTH_METADATA_COLUMN_FAMILY, username.getBytes(StandardCharsets.UTF_8));
this.storage.flushWAL();
this.userCache.invalidate(username);
} catch (Exception e) {
@@ -98,7 +101,7 @@
try {
byte[] keyBytes = user.getUsername().getBytes(StandardCharsets.UTF_8);
byte[] valueBytes = JSON.toJSONBytes(user);
- this.storage.put(keyBytes, keyBytes.length, valueBytes);
+ this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
this.storage.flushWAL();
this.userCache.invalidate(user.getUsername());
} catch (Exception e) {
@@ -119,20 +122,21 @@
@Override
public CompletableFuture<List<User>> listUser(String filter) {
List<User> result = new ArrayList<>();
- try (RocksIterator iterator = this.storage.iterator()) {
- iterator.seekToFirst();
- while (iterator.isValid()) {
- String username = new String(iterator.key(), StandardCharsets.UTF_8);
+ CompletableFuture<List<User>> future = new CompletableFuture<>();
+ try {
+ this.storage.iterate(AUTH_METADATA_COLUMN_FAMILY, (key, value) -> {
+ String username = new String(key, StandardCharsets.UTF_8);
if (StringUtils.isNotBlank(filter) && !username.contains(filter)) {
- iterator.next();
- continue;
+ return;
}
- User user = JSON.parseObject(new String(iterator.value(), StandardCharsets.UTF_8), User.class);
+ User user = JSON.parseObject(new String(value, StandardCharsets.UTF_8), User.class);
result.add(user);
- iterator.next();
- }
+ });
+ } catch (Exception e) {
+ future.completeExceptionally(e);
}
- return CompletableFuture.completedFuture(result);
+ future.complete(result);
+ return future;
}
@Override
@@ -154,7 +158,7 @@
public User load(String username) {
try {
byte[] keyBytes = username.getBytes(StandardCharsets.UTF_8);
- byte[] valueBytes = storage.get(keyBytes);
+ byte[] valueBytes = storage.get(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
if (ArrayUtils.isEmpty(valueBytes)) {
return EMPTY_USER;
}
diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
index 54d8870..6db999b 100644
--- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
+++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
@@ -40,17 +40,20 @@
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
-import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksDB;
public class LocalAuthorizationMetadataProvider implements AuthorizationMetadataProvider {
+ private final static String AUTH_METADATA_COLUMN_FAMILY = new String(RocksDB.DEFAULT_COLUMN_FAMILY,
+ StandardCharsets.UTF_8);
+
private ConfigRocksDBStorage storage;
private LoadingCache<String, Acl> aclCache;
@Override
public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
- this.storage = new ConfigRocksDBStorage(authConfig.getAuthConfigPath() + File.separator + "acls");
+ this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "acls", false);
if (!this.storage.start()) {
throw new RuntimeException("Failed to load rocksdb for auth_acl, please check whether it is occupied.");
}
@@ -77,7 +80,7 @@
Subject subject = acl.getSubject();
byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
byte[] valueBytes = JSON.toJSONBytes(acl);
- this.storage.put(keyBytes, keyBytes.length, valueBytes);
+ this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
this.storage.flushWAL();
this.aclCache.invalidate(subject.getSubjectKey());
} catch (Exception e) {
@@ -90,7 +93,7 @@
public CompletableFuture<Void> deleteAcl(Subject subject) {
try {
byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
- this.storage.delete(keyBytes);
+ this.storage.delete(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
this.storage.flushWAL();
this.aclCache.invalidate(subject.getSubjectKey());
} catch (Exception e) {
@@ -105,7 +108,7 @@
Subject subject = acl.getSubject();
byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
byte[] valueBytes = JSON.toJSONBytes(acl);
- this.storage.put(keyBytes, keyBytes.length, valueBytes);
+ this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
this.storage.flushWAL();
this.aclCache.invalidate(subject.getSubjectKey());
} catch (Exception e) {
@@ -126,20 +129,18 @@
@Override
public CompletableFuture<List<Acl>> listAcl(String subjectFilter, String resourceFilter) {
List<Acl> result = new ArrayList<>();
- try (RocksIterator iterator = this.storage.iterator()) {
- iterator.seekToFirst();
- while (iterator.isValid()) {
- String subjectKey = new String(iterator.key(), StandardCharsets.UTF_8);
+ CompletableFuture<List<Acl>> future = new CompletableFuture<>();
+ try {
+ this.storage.iterate(AUTH_METADATA_COLUMN_FAMILY, (key, value) -> {
+ String subjectKey = new String(key, StandardCharsets.UTF_8);
if (StringUtils.isNotBlank(subjectFilter) && !subjectKey.contains(subjectFilter)) {
- iterator.next();
- continue;
+ return;
}
Subject subject = Subject.of(subjectKey);
- Acl acl = JSON.parseObject(new String(iterator.value(), StandardCharsets.UTF_8), Acl.class);
+ Acl acl = JSON.parseObject(new String(value, StandardCharsets.UTF_8), Acl.class);
List<Policy> policies = acl.getPolicies();
if (!CollectionUtils.isNotEmpty(policies)) {
- iterator.next();
- continue;
+ return;
}
Iterator<Policy> policyIterator = policies.iterator();
while (policyIterator.hasNext()) {
@@ -158,10 +159,12 @@
if (CollectionUtils.isNotEmpty(policies)) {
result.add(Acl.of(subject, policies));
}
- iterator.next();
- }
+ });
+ } catch (Exception e) {
+ future.completeExceptionally(e);
}
- return CompletableFuture.completedFuture(result);
+ future.complete(result);
+ return future;
}
@Override
@@ -185,7 +188,7 @@
byte[] keyBytes = subjectKey.getBytes(StandardCharsets.UTF_8);
Subject subject = Subject.of(subjectKey);
- byte[] valueBytes = this.storage.get(keyBytes);
+ byte[] valueBytes = this.storage.get(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
if (ArrayUtils.isEmpty(valueBytes)) {
return EMPTY_ACL;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java
similarity index 63%
rename from broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
rename to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java
index ee2d4e5..4ebdce1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker;
+package org.apache.rocketmq.broker.config.v1;
import com.alibaba.fastjson.JSON;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
@@ -27,12 +28,16 @@
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions;
-import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;
public class RocksDBConfigManager {
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ public static final Charset CHARSET = StandardCharsets.UTF_8;
+
public volatile boolean isStop = false;
public ConfigRocksDBStorage configRocksDBStorage = null;
private FlushOptions flushOptions = null;
@@ -42,21 +47,44 @@
private final CompressionType compressionType;
private DataVersion kvDataVersion = new DataVersion();
+ public static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME = "kvDataVersion".getBytes(CHARSET);
+ public static final byte[] KV_DATA_VERSION_KEY = "kvDataVersionKey".getBytes(CHARSET);
+
+ private final String defaultCF;
+ private final String versionCF;
+
+
+ public RocksDBConfigManager(String filePath, long memTableFlushInterval, CompressionType compressionType,
+ String defaultCF, String versionCF) {
+ this.filePath = filePath;
+ this.memTableFlushInterval = memTableFlushInterval;
+ this.compressionType = compressionType;
+ this.defaultCF = defaultCF;
+ this.versionCF = versionCF;
+ }
+
public RocksDBConfigManager(String filePath, long memTableFlushInterval, CompressionType compressionType) {
this.filePath = filePath;
this.memTableFlushInterval = memTableFlushInterval;
this.compressionType = compressionType;
+ this.defaultCF = new String(RocksDB.DEFAULT_COLUMN_FAMILY, CHARSET);
+ this.versionCF = new String(KV_DATA_VERSION_COLUMN_FAMILY_NAME, CHARSET);
+ }
+
+ public boolean init(boolean readOnly) {
+ this.isStop = false;
+ this.configRocksDBStorage = ConfigRocksDBStorage.getStore(filePath, readOnly, compressionType);
+ return this.configRocksDBStorage.start();
}
public boolean init() {
- this.isStop = false;
- this.configRocksDBStorage = new ConfigRocksDBStorage(filePath, compressionType);
- return this.configRocksDBStorage.start();
+ return this.init(false);
}
+
public boolean loadDataVersion() {
String currDataVersionString = null;
try {
- byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion();
+ byte[] dataVersion = this.configRocksDBStorage.get(versionCF, KV_DATA_VERSION_KEY);
if (dataVersion != null && dataVersion.length > 0) {
currDataVersionString = new String(dataVersion, StandardCharsets.UTF_8);
}
@@ -68,12 +96,11 @@
}
public boolean loadData(BiConsumer<byte[], byte[]> biConsumer) {
- try (RocksIterator iterator = this.configRocksDBStorage.iterator()) {
- iterator.seekToFirst();
- while (iterator.isValid()) {
- biConsumer.accept(iterator.key(), iterator.value());
- iterator.next();
- }
+ try {
+ configRocksDBStorage.iterate(this.defaultCF, biConsumer);
+ } catch (Exception e) {
+ BROKER_LOG.error("RocksDBConfigManager loadData failed", e);
+ return false;
}
this.flushOptions = new FlushOptions();
@@ -87,9 +114,7 @@
public boolean stop() {
this.isStop = true;
- if (this.configRocksDBStorage != null) {
- return this.configRocksDBStorage.shutdown();
- }
+ ConfigRocksDBStorage.shutdown(filePath);
if (this.flushOptions != null) {
this.flushOptions.close();
}
@@ -115,28 +140,38 @@
}
}
- public void put(final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception {
- this.configRocksDBStorage.put(keyBytes, keyLen, valueBytes);
+ public void put(final byte[] keyBytes, final byte[] valueBytes) throws Exception {
+ this.configRocksDBStorage.put(defaultCF, keyBytes, keyBytes.length, valueBytes);
+ }
+
+ public void put(String cf, String key, String value) throws Exception {
+ byte[] keyBytes = key.getBytes(CHARSET);
+ this.configRocksDBStorage.put(cf, keyBytes, keyBytes.length, value.getBytes(CHARSET));
+ }
+
+ public void put(String cf, final byte[] keyBytes, final byte[] valueBytes) throws Exception {
+ this.configRocksDBStorage.put(cf, keyBytes, keyBytes.length, valueBytes);
}
public void delete(final byte[] keyBytes) throws Exception {
- this.configRocksDBStorage.delete(keyBytes);
+ this.configRocksDBStorage.delete(defaultCF, keyBytes);
}
public void updateKvDataVersion() throws Exception {
kvDataVersion.nextVersion();
- this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
+ this.configRocksDBStorage.put(versionCF, KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length,
+ JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
}
public DataVersion getKvDataVersion() {
return kvDataVersion;
}
- public void updateForbidden(String key, String value) throws Exception {
- this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
+ // batch operations
+ public void writeBatchPutOperation(WriteBatch writeBatch, final byte[] key, final byte[] value) throws RocksDBException {
+ configRocksDBStorage.writeBatchPutOperation(defaultCF, writeBatch, key, value);
}
-
public void batchPutWithWal(final WriteBatch batch) throws Exception {
this.configRocksDBStorage.batchPutWithWal(batch);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
index 963c504..2d015af 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
@@ -18,12 +18,13 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
-import java.io.File;
-import java.util.Iterator;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -38,13 +39,37 @@
protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final String VERSION_COLUMN_FAMILY = "consumerOffsetVersion";
+ private static final String OFFSET_COLUMN_FAMILY = "consumerOffset";
+
protected transient RocksDBConfigManager rocksDBConfigManager;
+ private final boolean useSingleRocksDBForAllConfigs;
+ private final String storePathRootDir;
+
+ public RocksDBConsumerOffsetManager(BrokerController brokerController, boolean useSingleRocksDB,
+ String storePathRootDir) {
+ super(brokerController);
+
+ this.useSingleRocksDBForAllConfigs = useSingleRocksDB;
+ this.storePathRootDir = StringUtils.isBlank(storePathRootDir) ?
+ brokerController.getMessageStoreConfig().getStorePathRootDir() : storePathRootDir;
+
+ long flushInterval = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+ CompressionType compressionType =
+ CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+ String rocksDBPath = rocksdbConfigFilePath(storePathRootDir, useSingleRocksDB);
+
+ this.rocksDBConfigManager = useSingleRocksDB ? new RocksDBConfigManager(rocksDBPath, flushInterval,
+ compressionType, OFFSET_COLUMN_FAMILY, VERSION_COLUMN_FAMILY) : new RocksDBConfigManager(rocksDBPath,
+ flushInterval, compressionType);
+ }
+
+ public RocksDBConsumerOffsetManager(BrokerController brokerController, boolean useSingleRocksDBForAllConfigs) {
+ this(brokerController, useSingleRocksDBForAllConfigs, null);
+ }
public RocksDBConsumerOffsetManager(BrokerController brokerController) {
- super(brokerController);
- this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
- CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
-
+ this(brokerController, brokerController.getBrokerConfig().isUseSingleRocksDBForAllConfigs(), null);
}
@Override
@@ -55,7 +80,9 @@
if (!loadDataVersion() || !loadConsumerOffset()) {
return false;
}
-
+ if (useSingleRocksDBForAllConfigs) {
+ migrateFromSeparateRocksDBs();
+ }
return true;
}
@@ -112,19 +139,27 @@
log.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
}
- public String rocksdbConfigFilePath() {
- return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
+ public String rocksdbConfigFilePath(String storePathRootDir, boolean useSingleRocksDBForAllConfigs) {
+ if (StringUtils.isBlank(storePathRootDir)) {
+ storePathRootDir = brokerController.getMessageStoreConfig().getStorePathRootDir();
+ }
+ Path rootPath = Paths.get(storePathRootDir);
+ if (useSingleRocksDBForAllConfigs) {
+ return rootPath.resolve("config").resolve("metadata").toString();
+ }
+ return rootPath.resolve("config").resolve("consumerOffsets").toString();
+ }
+
+ @Override
+ public String configFilePath() {
+ return BrokerPathConfigHelper.getConsumerOffsetPath(this.storePathRootDir);
}
@Override
public synchronized void persist() {
- WriteBatch writeBatch = new WriteBatch();
- try {
- Iterator<Entry<String, ConcurrentMap<Integer, Long>>> iterator = this.offsetTable.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ConcurrentMap<Integer, Long>> entry = iterator.next();
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ for (Entry<String, ConcurrentMap<Integer, Long>> entry : this.offsetTable.entrySet()) {
putWriteBatch(writeBatch, entry.getKey(), entry.getValue());
-
if (writeBatch.getDataSize() >= 4 * 1024) {
this.rocksDBConfigManager.batchPutWithWal(writeBatch);
}
@@ -133,8 +168,6 @@
this.rocksDBConfigManager.flushWAL();
} catch (Exception e) {
log.error("consumer offset persist Failed", e);
- } finally {
- writeBatch.close();
}
}
@@ -148,7 +181,7 @@
RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
wrapper.setOffsetTable(offsetMap);
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
- writeBatch.put(keyBytes, valueBytes);
+ rocksDBConfigManager.writeBatchPutOperation(writeBatch, keyBytes, valueBytes);
}
@Override
@@ -161,6 +194,7 @@
return rocksDBConfigManager.getKvDataVersion();
}
+ @Override
public void updateDataVersion() {
try {
rocksDBConfigManager.updateKvDataVersion();
@@ -169,4 +203,98 @@
throw new RuntimeException(e);
}
}
+
+ /**
+ * Migrate data from separate RocksDB instances to the unified RocksDB when useSingleRocksDBForAllConfigs is
+ * enabled.
+ * This method will only be called when switching from separate RocksDB mode to unified mode.
+ * It opens the separate RocksDB in read-only mode, compares versions, and imports data if needed.
+ */
+ private void migrateFromSeparateRocksDBs() {
+ String separateRocksDBPath = rocksdbConfigFilePath(this.storePathRootDir, false);
+
+ // Check if separate RocksDB exists
+ if (!UtilAll.isPathExists(separateRocksDBPath)) {
+ log.info("Separate RocksDB for consumer offsets does not exist at {}, no migration needed",
+ separateRocksDBPath);
+ return;
+ }
+
+ log.info("Starting migration from separate RocksDB at {} to unified RocksDB", separateRocksDBPath);
+
+ // Open separate RocksDB in read-only mode
+ RocksDBConfigManager separateRocksDBConfigManager = null;
+ try {
+ long memTableFlushIntervalMs = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+ org.rocksdb.CompressionType compressionType =
+ org.rocksdb.CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+
+ separateRocksDBConfigManager = new RocksDBConfigManager(separateRocksDBPath, memTableFlushIntervalMs,
+ compressionType);
+
+ // Initialize in read-only mode
+ if (!separateRocksDBConfigManager.init(true)) {
+ log.error("Failed to initialize separate RocksDB in read-only mode");
+ return;
+ }
+
+ // Load data version from separate RocksDB
+ if (!separateRocksDBConfigManager.loadDataVersion()) {
+ log.error("Failed to load data version from separate RocksDB");
+ return;
+ }
+
+ DataVersion separateDataVersion = separateRocksDBConfigManager.getKvDataVersion();
+ DataVersion unifiedDataVersion = this.getDataVersion();
+
+ log.info("Comparing data versions - Separate: {}, Unified: {}", separateDataVersion, unifiedDataVersion);
+
+ // Compare versions and import if separate version is newer
+ if (separateDataVersion.getCounter().get() > unifiedDataVersion.getCounter().get()) {
+ log.info("Separate RocksDB has newer data, importing...");
+
+ // Load consumer offsets from separate RocksDB
+ if (separateRocksDBConfigManager.loadData(this::importConsumerOffset)) {
+ log.info("Successfully imported consumer offsets from separate RocksDB");
+
+ // Update unified data version to be newer than separate one
+ this.getDataVersion().assignNewOne(separateDataVersion);
+ this.getDataVersion().nextVersion(); // Make it one version higher
+ updateDataVersion();
+
+ log.info("Updated unified data version to {}", this.getDataVersion());
+ } else {
+ log.error("Failed to import consumer offsets from separate RocksDB");
+ }
+ } else {
+ log.info("Unified RocksDB is already up-to-date, no migration needed");
+ }
+ } catch (Exception e) {
+ log.error("Error during migration from separate RocksDB", e);
+ } finally {
+ // Clean up resources
+ if (separateRocksDBConfigManager != null) {
+ try {
+ separateRocksDBConfigManager.stop();
+ } catch (Exception e) {
+ log.warn("Error stopping separate RocksDB config manager", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Import a consumer offset from the separate RocksDB during migration
+ *
+ * @param key The topic@group name bytes
+ * @param body The consumer offset data bytes
+ */
+ private void importConsumerOffset(final byte[] key, final byte[] body) {
+ try {
+ decodeOffset(key, body);
+ this.rocksDBConfigManager.put(key, body);
+ } catch (Exception e) {
+ log.error("Error importing consumer offset", e);
+ }
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index f7e0de9..f6ae3a3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -19,30 +19,57 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
-import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.rocksdb.CompressionType;
-import org.rocksdb.RocksIterator;
public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
protected transient RocksDBConfigManager rocksDBConfigManager;
- public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
+ private static final String VERSION_COLUMN_FAMILY = "subscriptionGroupVersion";
+ private static final String GROUP_COLUMN_FAMILY = "subscriptionGroup";
+ private static final String FORBIDDEN_COLUMN_FAMILY_NAME = "forbidden";
+
+ private final boolean useSingleRocksDBForAllConfigs;
+ private final String storePathRootDir;
+
+ public RocksDBSubscriptionGroupManager(BrokerController brokerController, boolean useSingleRocksDB,
+ String storePathRootDir) {
super(brokerController, false);
- this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
- CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
+
+ this.useSingleRocksDBForAllConfigs = useSingleRocksDB;
+ this.storePathRootDir = StringUtils.isBlank(storePathRootDir) ?
+ brokerController.getMessageStoreConfig().getStorePathRootDir() : storePathRootDir;
+
+ long flushInterval = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+ CompressionType compressionType =
+ CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+ String rocksDBPath = rocksdbConfigFilePath(storePathRootDir, useSingleRocksDB);
+
+ this.rocksDBConfigManager = useSingleRocksDB ? new RocksDBConfigManager(rocksDBPath, flushInterval,
+ compressionType, GROUP_COLUMN_FAMILY, VERSION_COLUMN_FAMILY) : new RocksDBConfigManager(rocksDBPath,
+ flushInterval, compressionType);
+ }
+
+ public RocksDBSubscriptionGroupManager(BrokerController brokerController, boolean useSingleRocksDBForAllConfigs) {
+ this(brokerController, useSingleRocksDBForAllConfigs, null);
+ }
+
+ public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
+ this(brokerController, brokerController.getBrokerConfig().isUseSingleRocksDBForAllConfigs(), null);
}
@Override
@@ -53,6 +80,9 @@
if (!loadDataVersion() || !loadSubscriptionGroupAndForbidden()) {
return false;
}
+ if (useSingleRocksDBForAllConfigs) {
+ migrateFromSeparateRocksDBs();
+ }
this.init();
return true;
}
@@ -68,14 +98,13 @@
}
public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {
- try (RocksIterator iterator = this.rocksDBConfigManager.configRocksDBStorage.forbiddenIterator()) {
- iterator.seekToFirst();
- while (iterator.isValid()) {
- biConsumer.accept(iterator.key(), iterator.value());
- iterator.next();
- }
+ try {
+ this.rocksDBConfigManager.configRocksDBStorage.iterate(FORBIDDEN_COLUMN_FAMILY_NAME, biConsumer);
+ return true;
+ } catch (Exception e) {
+ log.error("loadForbidden exception", e);
}
- return true;
+ return false;
}
private boolean merge() {
@@ -102,7 +131,8 @@
final ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = this.getForbiddenTable();
for (Map.Entry<String, ConcurrentMap<String, Integer>> entry : forbiddenTable.entrySet()) {
try {
- this.rocksDBConfigManager.updateForbidden(entry.getKey(), JSON.toJSONString(entry.getValue()));
+ this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, entry.getKey(),
+ JSON.toJSONString(entry.getValue()));
log.info("import forbidden config to rocksdb, group={}", entry.getValue());
} catch (Exception e) {
log.error("import forbidden config to rocksdb failed, group={}", entry.getValue());
@@ -131,9 +161,9 @@
SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig);
try {
- byte[] keyBytes = groupName.getBytes(DataConverter.CHARSET_UTF8);
+ byte[] keyBytes = groupName.getBytes(RocksDBConfigManager.CHARSET);
byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible);
- this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes);
+ this.rocksDBConfigManager.put(keyBytes, valueBytes);
} catch (Exception e) {
log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString());
}
@@ -146,9 +176,9 @@
SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.putIfAbsent(groupName, subscriptionGroupConfig);
if (oldConfig == null) {
try {
- byte[] keyBytes = groupName.getBytes(DataConverter.CHARSET_UTF8);
+ byte[] keyBytes = groupName.getBytes(RocksDBConfigManager.CHARSET);
byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible);
- this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes);
+ this.rocksDBConfigManager.put(keyBytes, valueBytes);
} catch (Exception e) {
log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString());
}
@@ -160,7 +190,7 @@
protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) {
SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.remove(groupName);
try {
- this.rocksDBConfigManager.delete(groupName.getBytes(DataConverter.CHARSET_UTF8));
+ this.rocksDBConfigManager.delete(groupName.getBytes(RocksDBConfigManager.CHARSET));
} catch (Exception e) {
log.error("kv delete sub Failed, {}", subscriptionGroupConfig.toString());
}
@@ -169,7 +199,7 @@
protected void decodeSubscriptionGroup(byte[] key, byte[] body) {
- String groupName = new String(key, DataConverter.CHARSET_UTF8);
+ String groupName = new String(key, RocksDBConfigManager.CHARSET);
SubscriptionGroupConfig subscriptionGroupConfig = JSON.parseObject(body, SubscriptionGroupConfig.class);
this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig);
@@ -188,8 +218,20 @@
super.persist();
}
- public String rocksdbConfigFilePath() {
- return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator;
+ public String rocksdbConfigFilePath(String storePathRootDir, boolean useSingleRocksDBForAllConfigs) {
+ if (StringUtils.isBlank(storePathRootDir)) {
+ storePathRootDir = brokerController.getMessageStoreConfig().getStorePathRootDir();
+ }
+ Path rootPath = Paths.get(storePathRootDir);
+ if (useSingleRocksDBForAllConfigs) {
+ return rootPath.resolve("config").resolve("metadata").toString();
+ }
+ return rootPath.resolve("config").resolve("subscriptionGroups").toString();
+ }
+
+ @Override
+ public String configFilePath() {
+ return BrokerPathConfigHelper.getSubscriptionGroupPath(this.storePathRootDir);
}
@Override
@@ -208,8 +250,8 @@
}
protected void decodeForbidden(byte[] key, byte[] body) {
- String forbiddenGroupName = new String(key, DataConverter.CHARSET_UTF8);
- JSONObject jsonObject = JSON.parseObject(new String(body, DataConverter.CHARSET_UTF8));
+ String forbiddenGroupName = new String(key, RocksDBConfigManager.CHARSET);
+ JSONObject jsonObject = JSON.parseObject(new String(body, RocksDBConfigManager.CHARSET));
Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
ConcurrentMap<String, Integer> forbiddenGroup = new ConcurrentHashMap<>(entries.size());
for (Map.Entry<String, Object> entry : entries) {
@@ -223,7 +265,8 @@
public void updateForbidden(String group, String topic, int forbiddenIndex, boolean setOrClear) {
try {
super.updateForbidden(group, topic, forbiddenIndex, setOrClear);
- this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
+ this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, group,
+ JSON.toJSONString(this.getForbiddenTable().get(group)));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -233,7 +276,8 @@
public void setForbidden(String group, String topic, int forbiddenIndex) {
try {
super.setForbidden(group, topic, forbiddenIndex);
- this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
+ this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, group,
+ JSON.toJSONString(this.getForbiddenTable().get(group)));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -243,9 +287,131 @@
public void clearForbidden(String group, String topic, int forbiddenIndex) {
try {
super.clearForbidden(group, topic, forbiddenIndex);
- this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
+ this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, group,
+ JSON.toJSONString(this.getForbiddenTable().get(group)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Migrate data from separate RocksDB instances to the unified RocksDB when useSingleRocksDBForAllConfigs is
+ * enabled.
+ * This method will only be called when switching from separate RocksDB mode to unified mode.
+ * It opens the separate RocksDB in read-only mode, compares versions, and imports data if needed.
+ */
+ private void migrateFromSeparateRocksDBs() {
+ String separateRocksDBPath = rocksdbConfigFilePath(this.storePathRootDir, false);
+
+ // Check if separate RocksDB exists
+ if (!org.apache.rocketmq.common.UtilAll.isPathExists(separateRocksDBPath)) {
+ log.info("Separate RocksDB for subscription groups does not exist at {}, no migration needed",
+ separateRocksDBPath);
+ return;
+ }
+
+ log.info("Starting migration from separate RocksDB at {} to unified RocksDB", separateRocksDBPath);
+
+ // Open separate RocksDB in read-only mode
+ RocksDBConfigManager separateRocksDBConfigManager = null;
+ try {
+ long memTableFlushIntervalMs = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+ org.rocksdb.CompressionType compressionType =
+ org.rocksdb.CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+
+ separateRocksDBConfigManager = new RocksDBConfigManager(separateRocksDBPath, memTableFlushIntervalMs,
+ compressionType);
+
+ // Initialize in read-only mode
+ if (!separateRocksDBConfigManager.init(true)) {
+ log.error("Failed to initialize separate RocksDB in read-only mode");
+ return;
+ }
+
+ // Load data version from separate RocksDB
+ if (!separateRocksDBConfigManager.loadDataVersion()) {
+ log.error("Failed to load data version from separate RocksDB");
+ return;
+ }
+
+ org.apache.rocketmq.remoting.protocol.DataVersion separateDataVersion =
+ separateRocksDBConfigManager.getKvDataVersion();
+ org.apache.rocketmq.remoting.protocol.DataVersion unifiedDataVersion = this.getDataVersion();
+
+ log.info("Comparing data versions - Separate: {}, Unified: {}", separateDataVersion, unifiedDataVersion);
+
+ // Compare versions and import if separate version is newer
+ if (separateDataVersion.getCounter().get() > unifiedDataVersion.getCounter().get()) {
+ log.info("Separate RocksDB has newer data, importing...");
+
+ // Load subscription groups from separate RocksDB
+ boolean success = separateRocksDBConfigManager.loadData(this::importSubscriptionGroup);
+ if (success) {
+ // Load forbidden data directly using the storage
+ try {
+ separateRocksDBConfigManager.configRocksDBStorage.iterate(FORBIDDEN_COLUMN_FAMILY_NAME,
+ this::importForbidden);
+ log.info("Successfully imported subscription groups and forbidden data from separate RocksDB");
+
+ // Update unified data version to be newer than separate one
+ this.getDataVersion().assignNewOne(separateDataVersion);
+ this.getDataVersion().nextVersion(); // Make it one version higher
+ updateDataVersion();
+
+ log.info("Updated unified data version to {}", this.getDataVersion());
+ } catch (Exception e) {
+ log.error("Failed to import forbidden data from separate RocksDB", e);
+ success = false;
+ }
+ }
+
+ if (!success) {
+ log.error("Failed to import subscription groups or forbidden data from separate RocksDB");
+ }
+ } else {
+ log.info("Unified RocksDB is already up-to-date, no migration needed");
+ }
+ } catch (Exception e) {
+ log.error("Error during migration from separate RocksDB", e);
+ } finally {
+ // Clean up resources
+ if (separateRocksDBConfigManager != null) {
+ try {
+ separateRocksDBConfigManager.stop();
+ } catch (Exception e) {
+ log.warn("Error stopping separate RocksDB config manager", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Import a subscription group from the separate RocksDB during migration
+ *
+ * @param key The group name bytes
+ * @param body The subscription group data bytes
+ */
+ private void importSubscriptionGroup(byte[] key, byte[] body) {
+ try {
+ decodeSubscriptionGroup(key, body);
+ this.rocksDBConfigManager.put(key, body);
+ } catch (Exception e) {
+ log.error("Error importing subscription group", e);
+ }
+ }
+
+ /**
+ * Import forbidden data from the separate RocksDB during migration
+ *
+ * @param key The group name bytes
+ * @param body The forbidden data bytes
+ */
+ private void importForbidden(byte[] key, byte[] body) {
+ try {
+ decodeForbidden(key, body);
+ this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, key, body);
+ } catch (Exception e) {
+ log.error("Error importing forbidden data", e);
+ }
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index d64f808..4a8d124 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -18,11 +18,13 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
-import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
@@ -31,13 +33,37 @@
import org.rocksdb.CompressionType;
public class RocksDBTopicConfigManager extends TopicConfigManager {
+ private static final String VERSION_COLUMN_FAMILY = "topicVersion";
+ private static final String TOPIC_COLUMN_FAMILY = "topic";
protected transient RocksDBConfigManager rocksDBConfigManager;
+ private final boolean useSingleRocksDBForAllConfigs;
+ private final String storePathRootDir;
+
+ public RocksDBTopicConfigManager(BrokerController brokerController, boolean useSingleRocksDB,
+ String storePathRootDir) {
+ super(brokerController, false);
+
+ this.useSingleRocksDBForAllConfigs = useSingleRocksDB;
+ this.storePathRootDir = StringUtils.isBlank(storePathRootDir) ?
+ brokerController.getMessageStoreConfig().getStorePathRootDir() : storePathRootDir;
+
+ long flushInterval = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+ CompressionType compressionType =
+ CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+ String rocksDBPath = rocksdbConfigFilePath(storePathRootDir, useSingleRocksDB);
+
+ this.rocksDBConfigManager = useSingleRocksDB ? new RocksDBConfigManager(rocksDBPath, flushInterval,
+ compressionType, TOPIC_COLUMN_FAMILY, VERSION_COLUMN_FAMILY) : new RocksDBConfigManager(rocksDBPath,
+ flushInterval, compressionType);
+ }
+
+ public RocksDBTopicConfigManager(BrokerController brokerController, boolean useSingleRocksDBForAllConfigs) {
+ this(brokerController, useSingleRocksDBForAllConfigs, null);
+ }
public RocksDBTopicConfigManager(BrokerController brokerController) {
- super(brokerController, false);
- this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
- CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
+ this(brokerController, brokerController.getBrokerConfig().isUseSingleRocksDBForAllConfigs(), null);
}
@Override
@@ -48,6 +74,9 @@
if (!loadDataVersion() || !loadTopicConfig()) {
return false;
}
+ if (useSingleRocksDBForAllConfigs) {
+ migrateFromSeparateRocksDBs();
+ }
this.init();
return true;
}
@@ -114,7 +143,7 @@
try {
byte[] keyBytes = topicName.getBytes(DataConverter.CHARSET_UTF8);
byte[] valueBytes = JSON.toJSONBytes(topicConfig, SerializerFeature.BrowserCompatible);
- this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes);
+ this.rocksDBConfigManager.put(keyBytes, valueBytes);
} catch (Exception e) {
log.error("kv put topic Failed, {}", topicConfig.toString(), e);
}
@@ -144,10 +173,21 @@
super.persist();
}
- public String rocksdbConfigFilePath() {
- return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "topics" + File.separator;
+ public String rocksdbConfigFilePath(String storePathRootDir, boolean useSingleRocksDBForAllConfigs) {
+ if (StringUtils.isBlank(storePathRootDir)) {
+ storePathRootDir = brokerController.getMessageStoreConfig().getStorePathRootDir();
+ }
+ Path rootPath = Paths.get(storePathRootDir);
+ if (useSingleRocksDBForAllConfigs) {
+ return rootPath.resolve("config").resolve("metadata").toString();
+ }
+ return rootPath.resolve("config").resolve("topics").toString();
}
+ @Override
+ public String configFilePath() {
+ return BrokerPathConfigHelper.getTopicConfigPath(this.storePathRootDir);
+ }
@Override
public DataVersion getDataVersion() {
@@ -163,4 +203,94 @@
throw new RuntimeException(e);
}
}
+
+ /**
+ * Migrate data from separate RocksDB instances to the unified RocksDB when useSingleRocksDBForAllConfigs is
+ * enabled.
+ * This method will only be called when switching from separate RocksDB mode to unified mode.
+ * It opens the separate RocksDB in read-only mode, compares versions, and imports data if needed.
+ */
+ private void migrateFromSeparateRocksDBs() {
+ String separateRocksDBPath = rocksdbConfigFilePath(this.storePathRootDir, false);
+
+ // Check if separate RocksDB exists
+ if (!UtilAll.isPathExists(separateRocksDBPath)) {
+ log.info("Separate RocksDB for topics does not exist at {}, no migration needed", separateRocksDBPath);
+ return;
+ }
+
+ log.info("Starting migration from separate RocksDB at {} to unified RocksDB", separateRocksDBPath);
+
+ // Open separate RocksDB in read-only mode
+ RocksDBConfigManager separateRocksDBConfigManager = null;
+ try {
+ long memTableFlushIntervalMs = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+ org.rocksdb.CompressionType compressionType =
+ org.rocksdb.CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+
+ separateRocksDBConfigManager = new RocksDBConfigManager(separateRocksDBPath, memTableFlushIntervalMs,
+ compressionType);
+
+ // Initialize in read-only mode
+ if (!separateRocksDBConfigManager.init(true)) {
+ log.error("Failed to initialize separate RocksDB in read-only mode");
+ return;
+ }
+
+ // Load data version from separate RocksDB
+ if (!separateRocksDBConfigManager.loadDataVersion()) {
+ log.error("Failed to load data version from separate RocksDB");
+ return;
+ }
+
+ DataVersion separateDataVersion = separateRocksDBConfigManager.getKvDataVersion();
+ DataVersion unifiedDataVersion = this.getDataVersion();
+
+ log.info("Comparing data versions - Separate: {}, Unified: {}", separateDataVersion, unifiedDataVersion);
+
+ // Compare versions and import if separate version is newer
+ if (separateDataVersion.getCounter().get() > unifiedDataVersion.getCounter().get()) {
+ log.info("Separate RocksDB has newer data, importing...");
+
+ // Load topic configs from separate RocksDB
+ if (separateRocksDBConfigManager.loadData(this::importTopicConfig)) {
+ log.info("Successfully imported topic configs from separate RocksDB");
+
+ this.getDataVersion().assignNewOne(separateDataVersion);
+ this.getDataVersion().nextVersion(); // Make it one version higher
+ updateDataVersion();
+ log.info("Updated unified data version to {}", this.getDataVersion());
+ } else {
+ log.error("Failed to import topic configs from separate RocksDB");
+ }
+ } else {
+ log.info("Unified RocksDB is already up-to-date, no migration needed");
+ }
+ } catch (Exception e) {
+ log.error("Error during migration from separate RocksDB", e);
+ } finally {
+ if (separateRocksDBConfigManager != null) {
+ try {
+ separateRocksDBConfigManager.stop();
+ } catch (Exception e) {
+ log.warn("Error stopping separate RocksDB config manager", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Import a topic config from the separate RocksDB during migration
+ *
+ * @param key The topic name bytes
+ * @param body The topic config data bytes
+ */
+ private void importTopicConfig(byte[] key, byte[] body) {
+ try {
+ decodeTopicConfig(key, body);
+ this.rocksDBConfigManager.put(key, body);
+ } catch (Exception e) {
+ log.error("Error importing topic config", e);
+ }
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index e1e1cb4..3eee9fc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.offset;
+import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.HashSet;
@@ -26,9 +27,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Strings;
-
import java.util.function.Function;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -217,8 +215,7 @@
}
}
if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep() == 0) {
- long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
+ updateDataVersion();
}
}
@@ -378,6 +375,12 @@
return dataVersion;
}
+ public void updateDataVersion() {
+ long stateMachineVersion = brokerController.getMessageStore() != null ?
+ brokerController.getMessageStore().getStateMachineVersion() : 0;
+ dataVersion.nextVersion(stateMachineVersion);
+ }
+
public void setDataVersion(DataVersion dataVersion) {
this.dataVersion = dataVersion;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index ed46dfd..f59651f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -676,7 +676,7 @@
public String encode(final boolean prettyFormat) {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
- topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+ topicConfigSerializeWrapper.setDataVersion(getDataVersion());
return topicConfigSerializeWrapper.toJson(prettyFormat);
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManagerMigrationTest.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManagerMigrationTest.java
new file mode 100644
index 0000000..697316e
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManagerMigrationTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v1;
+
+import java.io.File;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RocksDBConsumerOffsetManagerMigrationTest {
+
+ private static final String TEST_GROUP = "TestGroup";
+ private static final String TEST_TOPIC = "TestTopic";
+ private static final String TEST_KEY = TEST_TOPIC + "@" + TEST_GROUP;
+
+ private BrokerController brokerController;
+ private String storePath;
+ private String separateRocksDBPath;
+ private String unifiedRocksDBPath;
+
+ @Before
+ public void init() {
+ brokerController = Mockito.mock(BrokerController.class);
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ storePath = System.getProperty("java.io.tmpdir") + File.separator + "rocketmq-test-" + System.currentTimeMillis();
+ messageStoreConfig.setStorePathRootDir(storePath);
+ Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setConsumerOffsetUpdateVersionStep(1);
+ Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+
+ separateRocksDBPath = storePath + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
+ unifiedRocksDBPath = storePath + File.separator + "config" + File.separator + "metadata" + File.separator;
+
+ // Create directories
+ UtilAll.ensureDirOK(separateRocksDBPath);
+ UtilAll.ensureDirOK(unifiedRocksDBPath);
+ }
+
+ @After
+ public void destroy() {
+ // Clean up test directories
+ UtilAll.deleteFile(new File(storePath));
+ }
+
+ @Test
+ public void testMigrationFromSeparateToUnifiedRocksDB() {
+
+ // First, create data in separate RocksDB mode
+ RocksDBConsumerOffsetManager separateManager = new RocksDBConsumerOffsetManager(brokerController, false);
+ separateManager.load();
+
+ // Add some consumer offsets
+ separateManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 0, 100L);
+ separateManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 1, 200L);
+ separateManager.persist();
+ separateManager.stop();
+
+ // Now create unified RocksDB manager which should migrate data
+ RocksDBConsumerOffsetManager unifiedManager = new RocksDBConsumerOffsetManager(brokerController, true);
+ boolean loaded = unifiedManager.load();
+ Assert.assertTrue("Unified manager should load successfully", loaded);
+
+ // Verify that data was migrated
+ ConcurrentMap<Integer, Long> migratedOffsetMap = unifiedManager.getOffsetTable().get(TEST_KEY);
+ Assert.assertNotNull("Consumer offset should be migrated", migratedOffsetMap);
+ Assert.assertEquals("Offset for queue 0 should match", Long.valueOf(100L), migratedOffsetMap.get(0));
+ Assert.assertEquals("Offset for queue 1 should match", Long.valueOf(200L), migratedOffsetMap.get(1));
+
+ unifiedManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 0, 300L);
+ unifiedManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 1, 400L);
+ unifiedManager.persist();
+ unifiedManager.stop();
+
+ // reload unified RocksDB manager which should not migrate data
+ unifiedManager = new RocksDBConsumerOffsetManager(brokerController, true);
+ unifiedManager.load();
+
+ // Verify that data was new
+ migratedOffsetMap = unifiedManager.getOffsetTable().get(TEST_KEY);
+ Assert.assertEquals("Offset for queue 0 should match", Long.valueOf(300L), migratedOffsetMap.get(0));
+ Assert.assertEquals("Offset for queue 1 should match", Long.valueOf(400L), migratedOffsetMap.get(1));
+ unifiedManager.stop();
+ }
+
+ @Test
+ public void testMigrationWithNoSeparateRocksDB() {
+
+ // Ensure separate RocksDB doesn't exist
+ UtilAll.deleteFile(new File(separateRocksDBPath));
+
+ // Create unified RocksDB manager - should not fail even without separate DB
+ RocksDBConsumerOffsetManager unifiedManager = new RocksDBConsumerOffsetManager(brokerController, true);
+ boolean loaded = unifiedManager.load();
+ Assert.assertTrue("Unified manager should load successfully even without separate DB", loaded);
+
+ unifiedManager.stop();
+ }
+
+ @Test
+ public void testNoMigrationWhenDisabled() {
+
+ // Create data in separate RocksDB mode
+ RocksDBConsumerOffsetManager separateManager = new RocksDBConsumerOffsetManager(brokerController, false);
+ separateManager.load();
+
+ separateManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 0, 100L);
+ separateManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 1, 200L);
+ separateManager.persist();
+ separateManager.stop();
+
+ long version = separateManager.getDataVersion().getCounter().get();
+ Assert.assertEquals(2, version);
+
+ // Create another separate manager - should not trigger migration
+ RocksDBConsumerOffsetManager anotherSeparateManager = new RocksDBConsumerOffsetManager(brokerController, false);
+ boolean loaded = anotherSeparateManager.load();
+ Assert.assertTrue("Separate manager should load successfully", loaded);
+
+ anotherSeparateManager.loadDataVersion();
+ Assert.assertEquals(version, anotherSeparateManager.getDataVersion().getCounter().get());
+ anotherSeparateManager.stop();
+ }
+}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManagerMigrationTest.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManagerMigrationTest.java
new file mode 100644
index 0000000..cb23c9f
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManagerMigrationTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v1;
+
+import java.io.File;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RocksDBSubscriptionGroupManagerMigrationTest {
+
+ private static final String TEST_GROUP = "TestGroup";
+
+ private BrokerController brokerController;
+ private String storePath;
+ private String separateRocksDBPath;
+ private String unifiedRocksDBPath;
+
+ @Before
+ public void init() {
+
+ brokerController = Mockito.mock(BrokerController.class);
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ storePath = System.getProperty("java.io.tmpdir") + File.separator + "rocketmq-test-" + System.currentTimeMillis();
+ messageStoreConfig.setStorePathRootDir(storePath);
+ Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
+ separateRocksDBPath = storePath + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator;
+ unifiedRocksDBPath = storePath + File.separator + "config" + File.separator + "metadata" + File.separator;
+
+ // Create directories
+ UtilAll.ensureDirOK(separateRocksDBPath);
+ UtilAll.ensureDirOK(unifiedRocksDBPath);
+ }
+
+ @After
+ public void destroy() {
+
+ // Clean up test directories
+ UtilAll.deleteFile(new File(storePath));
+ }
+
+ @Test
+ public void testMigrationFromSeparateToUnifiedRocksDB() {
+
+ // First, create data in separate RocksDB mode
+ RocksDBSubscriptionGroupManager separateManager = new RocksDBSubscriptionGroupManager(brokerController, false);
+ separateManager.load();
+
+ // Add some subscription groups
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(TEST_GROUP);
+ groupConfig.setConsumeEnable(true);
+ groupConfig.setConsumeFromMinEnable(true);
+ groupConfig.setRetryMaxTimes(3);
+ separateManager.updateSubscriptionGroupConfig(groupConfig);
+ separateManager.persist();
+ separateManager.stop();
+
+ {
+ // Now create unified RocksDB manager which should migrate data
+ RocksDBSubscriptionGroupManager unifiedManager = new RocksDBSubscriptionGroupManager(brokerController, true);
+ boolean loaded = unifiedManager.load();
+ Assert.assertTrue("Unified manager should load successfully", loaded);
+
+ // Verify that data was migrated
+ SubscriptionGroupConfig migratedConfig = unifiedManager.findSubscriptionGroupConfig(TEST_GROUP);
+ Assert.assertNotNull("Subscription group should be migrated", migratedConfig);
+ Assert.assertEquals("Group name should match", TEST_GROUP, migratedConfig.getGroupName());
+ Assert.assertEquals("Retry max times should match", 3, migratedConfig.getRetryMaxTimes());
+ Assert.assertTrue("Consume enable should match", migratedConfig.isConsumeEnable());
+ Assert.assertTrue("Consume from min enable should match", migratedConfig.isConsumeFromMinEnable());
+
+ groupConfig.setRetryMaxTimes(4);
+ unifiedManager.updateSubscriptionGroupConfig(groupConfig);
+ unifiedManager.persist();
+ unifiedManager.stop();
+ }
+
+ {
+ // Now create unified RocksDB manager which should migrate data
+ RocksDBSubscriptionGroupManager unifiedManager = new RocksDBSubscriptionGroupManager(brokerController, true);
+ boolean loaded = unifiedManager.load();
+ Assert.assertTrue("Unified manager should load successfully", loaded);
+
+ // Verify that data was migrated
+ SubscriptionGroupConfig migratedConfig = unifiedManager.findSubscriptionGroupConfig(TEST_GROUP);
+ Assert.assertNotNull("Subscription group should be migrated", migratedConfig);
+ Assert.assertEquals("Group name should match", TEST_GROUP, migratedConfig.getGroupName());
+ Assert.assertEquals("Retry max times should match", 4, migratedConfig.getRetryMaxTimes());
+ Assert.assertTrue("Consume enable should match", migratedConfig.isConsumeEnable());
+ Assert.assertTrue("Consume from min enable should match", migratedConfig.isConsumeFromMinEnable());
+
+ unifiedManager.stop();
+ }
+ }
+
+ @Test
+ public void testMigrationWithNoSeparateRocksDB() {
+
+ // Ensure separate RocksDB doesn't exist
+ UtilAll.deleteFile(new File(separateRocksDBPath));
+
+ // Create unified RocksDB manager - should not fail even without separate DB
+ RocksDBSubscriptionGroupManager unifiedManager = new RocksDBSubscriptionGroupManager(brokerController, true);
+ boolean loaded = unifiedManager.load();
+ Assert.assertTrue("Unified manager should load successfully even without separate DB", loaded);
+
+ unifiedManager.stop();
+ }
+
+ @Test
+ public void testNoMigrationWhenDisabled() {
+
+ // Create data in separate RocksDB mode
+ RocksDBSubscriptionGroupManager separateManager = new RocksDBSubscriptionGroupManager(brokerController, false);
+ separateManager.load();
+
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(TEST_GROUP);
+ groupConfig.setConsumeEnable(true);
+ groupConfig.setConsumeFromMinEnable(true);
+ separateManager.putSubscriptionGroupConfig(groupConfig);
+ separateManager.persist();
+ separateManager.stop();
+
+ // Create another separate manager - should not trigger migration
+ RocksDBSubscriptionGroupManager anotherSeparateManager = new RocksDBSubscriptionGroupManager(brokerController, false);
+ boolean loaded = anotherSeparateManager.load();
+ Assert.assertTrue("Separate manager should load successfully", loaded);
+
+ anotherSeparateManager.stop();
+ }
+}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManagerMigrationTest.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManagerMigrationTest.java
new file mode 100644
index 0000000..e7097bf
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManagerMigrationTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v1;
+
+import java.io.File;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RocksDBTopicConfigManagerMigrationTest {
+
+ private static final String TEST_TOPIC = "TestTopic";
+
+ private BrokerController brokerController;
+ private String storePath;
+ private String separateRocksDBPath;
+ private String unifiedRocksDBPath;
+
+ @Before
+ public void init() {
+
+ brokerController = Mockito.mock(BrokerController.class);
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ storePath = System.getProperty("java.io.tmpdir") + File.separator + "rocketmq-test-" + System.currentTimeMillis();
+ messageStoreConfig.setStorePathRootDir(storePath);
+ Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+ Mockito.when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
+
+ separateRocksDBPath = storePath + File.separator + "config" + File.separator + "topics" + File.separator;
+ unifiedRocksDBPath = storePath + File.separator + "config" + File.separator + "metadata" + File.separator;
+
+ // Create directories
+ UtilAll.ensureDirOK(separateRocksDBPath);
+ UtilAll.ensureDirOK(unifiedRocksDBPath);
+ }
+
+ @After
+ public void destroy() {
+
+ // Clean up test directories
+ UtilAll.deleteFile(new File(storePath));
+ }
+
+ @Test
+ public void testMigrationFromSeparateToUnifiedRocksDB() {
+
+ // First, create data in separate RocksDB mode
+ RocksDBTopicConfigManager separateManager = new RocksDBTopicConfigManager(brokerController, false);
+ separateManager.load();
+
+ // Add some topic configs
+ TopicConfig topicConfig = new TopicConfig(TEST_TOPIC, 4, 4);
+ separateManager.updateTopicConfig(topicConfig);
+ separateManager.persist();
+ separateManager.stop();
+
+ {
+ // Now create unified RocksDB manager which should migrate data
+ RocksDBTopicConfigManager unifiedManager = new RocksDBTopicConfigManager(brokerController, true);
+ boolean loaded = unifiedManager.load();
+ Assert.assertTrue("Unified manager should load successfully", loaded);
+
+ // Verify that data was migrated
+ TopicConfig migratedConfig = unifiedManager.selectTopicConfig(TEST_TOPIC);
+ Assert.assertNotNull("Topic config should be migrated", migratedConfig);
+ Assert.assertEquals("Topic name should match", TEST_TOPIC, migratedConfig.getTopicName());
+ Assert.assertEquals("Read queue num should match", 4, migratedConfig.getReadQueueNums());
+ Assert.assertEquals("Write queue num should match", 4, migratedConfig.getWriteQueueNums());
+
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setWriteQueueNums(8);
+ unifiedManager.updateTopicConfig(topicConfig);
+ unifiedManager.persist();
+ unifiedManager.stop();
+ }
+
+ {
+ // Now create unified RocksDB manager which should migrate data
+ RocksDBTopicConfigManager unifiedManager = new RocksDBTopicConfigManager(brokerController, true);
+ boolean loaded = unifiedManager.load();
+ Assert.assertTrue("Unified manager should load successfully", loaded);
+
+ // Verify that data was migrated
+ TopicConfig migratedConfig = unifiedManager.selectTopicConfig(TEST_TOPIC);
+ Assert.assertNotNull("Topic config should be migrated", migratedConfig);
+ Assert.assertEquals("Topic name should match", TEST_TOPIC, migratedConfig.getTopicName());
+ Assert.assertEquals("Read queue num should match", 8, migratedConfig.getReadQueueNums());
+ Assert.assertEquals("Write queue num should match", 8, migratedConfig.getWriteQueueNums());
+
+ unifiedManager.stop();
+ }
+
+ }
+
+ @Test
+ public void testMigrationWithNoSeparateRocksDB() {
+
+ // Ensure separate RocksDB doesn't exist
+ UtilAll.deleteFile(new File(separateRocksDBPath));
+
+ // Create unified RocksDB manager - should not fail even without separate DB
+ RocksDBTopicConfigManager unifiedManager = new RocksDBTopicConfigManager(brokerController, true);
+ boolean loaded = unifiedManager.load();
+ Assert.assertTrue("Unified manager should load successfully even without separate DB", loaded);
+
+ unifiedManager.stop();
+ }
+
+ @Test
+ public void testNoMigrationWhenDisabled() {
+ // Create data in separate RocksDB mode
+ RocksDBTopicConfigManager separateManager = new RocksDBTopicConfigManager(brokerController, false);
+ separateManager.load();
+
+ TopicConfig topicConfig = new TopicConfig(TEST_TOPIC, 4, 4);
+ separateManager.putTopicConfig(topicConfig);
+ separateManager.persist();
+ separateManager.stop();
+
+ // Create another separate manager - should not trigger migration
+ RocksDBTopicConfigManager anotherSeparateManager = new RocksDBTopicConfigManager(brokerController, false);
+ boolean loaded = anotherSeparateManager.load();
+ Assert.assertTrue("Separate manager should load successfully", loaded);
+
+ anotherSeparateManager.stop();
+ }
+}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
index 191850f..1227d33 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
@@ -22,7 +22,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
-import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
@@ -49,6 +49,7 @@
brokerController = Mockito.mock(BrokerController.class);
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+ Mockito.when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
consumerOffsetManager = new RocksDBConsumerOffsetManager(brokerController);
consumerOffsetManager.load();
@@ -110,6 +111,6 @@
}
private boolean notToBeExecuted() {
- return MixAll.isMac();
+ return false;
}
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
index aa5003f..a2dbf60 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
@@ -50,7 +50,7 @@
@Before
public void setUp() {
brokerController = Mockito.mock(BrokerController.class);
- when(brokerController.getMessageStoreConfig()).thenReturn(Mockito.mock(MessageStoreConfig.class));
+ when(brokerController.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
offsetManager = new RocksDBConsumerOffsetManager(brokerController);
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 04828da..a464355 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -471,6 +471,12 @@
*/
private String configManagerVersion = ConfigManagerVersion.V1.getVersion();
+ /**
+ * Whether to use a single RocksDB instance with multiple column families for all configs
+ * instead of separate RocksDB instances for Topic, Group, and Offset configs
+ */
+ private boolean useSingleRocksDBForAllConfigs = false;
+
private boolean allowRecallWhenBrokerNotWriteable = true;
private boolean recallMessageEnable = false;
@@ -2116,6 +2122,14 @@
this.configManagerVersion = configManagerVersion;
}
+ public boolean isUseSingleRocksDBForAllConfigs() {
+ return useSingleRocksDBForAllConfigs;
+ }
+
+ public void setUseSingleRocksDBForAllConfigs(boolean useSingleRocksDBForAllConfigs) {
+ this.useSingleRocksDBForAllConfigs = useSingleRocksDBForAllConfigs;
+ }
+
public boolean isAllowRecallWhenBrokerNotWriteable() {
return allowRecallWhenBrokerNotWriteable;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index e087817..08a103b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -18,6 +18,7 @@
import com.google.common.collect.Maps;
import io.netty.buffer.PooledByteBufAllocator;
+import java.io.File;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -506,7 +507,9 @@
//1. close column family handles
preShutdown();
- this.defaultCFHandle.close();
+ if (this.defaultCFHandle.isOwningHandle()) {
+ this.defaultCFHandle.close();
+ }
//2. close column family options.
for (final ColumnFamilyOptions opt : this.cfOptions) {
@@ -711,4 +714,22 @@
} catch (Exception ignored) {
}
}
+
+ public void destroy() {
+ recursiveDelete(new File(dbPath));
+ }
+
+ void recursiveDelete(File file) {
+ if (file.isFile()) {
+ if (file.delete()) {
+ LOGGER.info("Delete rocksdb file={}", file.getAbsolutePath());
+ }
+ } else {
+ File[] files = file.listFiles();
+ for (File f : files) {
+ recursiveDelete(f);
+ }
+ file.delete();
+ }
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index 5fd9bab..e1802fd 100644
--- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -17,47 +17,50 @@
package org.apache.rocketmq.common.config;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompressionType;
-import org.rocksdb.ReadOptions;
+import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
public class ConfigRocksDBStorage extends AbstractRocksDBStorage {
- public static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME = "kvDataVersion".getBytes(StandardCharsets.UTF_8);
- public static final byte[] FORBIDDEN_COLUMN_FAMILY_NAME = "forbidden".getBytes(StandardCharsets.UTF_8);
+ public static final Charset CHARSET = StandardCharsets.UTF_8;
+ public static final ConcurrentMap<String, ConfigRocksDBStorage> STORE_MAP = new ConcurrentHashMap<>();
- protected ColumnFamilyHandle kvDataVersionFamilyHandle;
- protected ColumnFamilyHandle forbiddenFamilyHandle;
- public static final byte[] KV_DATA_VERSION_KEY = "kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
+ private final ConcurrentHashMap<String, ColumnFamilyHandle> columnFamilyNameHandleMap;
+ private ColumnFamilyOptions columnFamilyOptions;
-
-
- public ConfigRocksDBStorage(final String dbPath) {
- this(dbPath, false);
- }
-
- public ConfigRocksDBStorage(final String dbPath, CompressionType compressionType) {
- this(dbPath, false);
- this.compressionType = compressionType;
+ private ConfigRocksDBStorage(final String dbPath, boolean readOnly, CompressionType compressionType) {
+ super(dbPath);
+ this.readOnly = readOnly;
+ if (compressionType != null) {
+ this.compressionType = compressionType;
+ }
+ this.columnFamilyNameHandleMap = new ConcurrentHashMap<>();
}
public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
- super(dbPath);
- this.readOnly = readOnly;
+ this(dbPath, readOnly, null);
}
protected void initOptions() {
this.options = ConfigHelper.createConfigDBOptions();
+ this.columnFamilyOptions = ConfigHelper.createConfigColumnFamilyOptions();
+ this.cfOptions.add(columnFamilyOptions);
super.initOptions();
}
@@ -68,19 +71,20 @@
initOptions();
- final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+ List<byte[]> columnFamilyNames = new ArrayList<>(RocksDB.listColumnFamilies(
+ new Options(options, columnFamilyOptions), dbPath));
+ addIfNotExists(columnFamilyNames, RocksDB.DEFAULT_COLUMN_FAMILY);
- ColumnFamilyOptions defaultOptions = ConfigHelper.createConfigColumnFamilyOptions();
- this.cfOptions.add(defaultOptions);
- cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
- cfDescriptors.add(new ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions));
- cfDescriptors.add(new ColumnFamilyDescriptor(FORBIDDEN_COLUMN_FAMILY_NAME, defaultOptions));
- open(cfDescriptors);
+ List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+ for (byte[] columnFamilyName : columnFamilyNames) {
+ cfDescriptors.add(new ColumnFamilyDescriptor(columnFamilyName, columnFamilyOptions));
+ }
- this.defaultCFHandle = cfHandles.get(0);
- this.kvDataVersionFamilyHandle = cfHandles.get(1);
- this.forbiddenFamilyHandle = cfHandles.get(2);
-
+ this.open(cfDescriptors);
+ for (int i = 0; i < columnFamilyNames.size(); i++) {
+ columnFamilyNameHandleMap.put(new String(columnFamilyNames.get(i), CHARSET), cfHandles.get(i));
+ }
+ this.defaultCFHandle = columnFamilyNameHandleMap.get(new String(RocksDB.DEFAULT_COLUMN_FAMILY, CHARSET));
} catch (final Exception e) {
AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", this.dbPath, e);
return false;
@@ -90,45 +94,16 @@
@Override
protected void preShutdown() {
- this.kvDataVersionFamilyHandle.close();
- this.forbiddenFamilyHandle.close();
+ for (final ColumnFamilyHandle columnFamilyHandle : this.columnFamilyNameHandleMap.values()) {
+ if (columnFamilyHandle.isOwningHandle()) {
+ columnFamilyHandle.close();
+ }
+ }
}
- public void put(final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception {
- put(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes, keyLen, valueBytes, valueBytes.length);
- }
-
- public void put(final ByteBuffer keyBB, final ByteBuffer valueBB) throws Exception {
- put(this.defaultCFHandle, this.ableWalWriteOptions, keyBB, valueBB);
- }
-
- public byte[] get(final byte[] keyBytes) throws Exception {
- return get(this.defaultCFHandle, this.totalOrderReadOptions, keyBytes);
- }
-
- public void updateKvDataVersion(final byte[] valueBytes) throws Exception {
- put(this.kvDataVersionFamilyHandle, this.ableWalWriteOptions, KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length, valueBytes, valueBytes.length);
- }
-
- public byte[] getKvDataVersion() throws Exception {
- return get(this.kvDataVersionFamilyHandle, this.totalOrderReadOptions, KV_DATA_VERSION_KEY);
- }
-
- public void updateForbidden(final byte[] keyBytes, final byte[] valueBytes) throws Exception {
- put(this.forbiddenFamilyHandle, this.ableWalWriteOptions, keyBytes, keyBytes.length, valueBytes, valueBytes.length);
- }
-
- public byte[] getForbidden(final byte[] keyBytes) throws Exception {
- return get(this.forbiddenFamilyHandle, this.totalOrderReadOptions, keyBytes);
- }
-
- public void delete(final byte[] keyBytes) throws Exception {
- delete(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes);
- }
-
- public List<byte[]> multiGet(final List<ColumnFamilyHandle> cfhList, final List<byte[]> keys) throws
- RocksDBException {
- return multiGet(this.totalOrderReadOptions, cfhList, keys);
+ // batch operations
+ public void writeBatchPutOperation(String cf, WriteBatch writeBatch, final byte[] key, final byte[] value) throws RocksDBException {
+ writeBatch.put(getOrCreateColumnFamily(cf), key, value);
}
public void batchPut(final WriteBatch batch) throws RocksDBException {
@@ -139,15 +114,100 @@
batchPut(this.ableWalWriteOptions, batch);
}
+
+ // operations with the specified cf
+ public void put(String cf, final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception {
+ put(getOrCreateColumnFamily(cf), this.ableWalWriteOptions, keyBytes, keyLen, valueBytes, valueBytes.length);
+ }
+
+ public void put(String cf, final ByteBuffer keyBB, final ByteBuffer valueBB) throws Exception {
+ put(getOrCreateColumnFamily(cf), this.ableWalWriteOptions, keyBB, valueBB);
+ }
+
+ public byte[] get(String cf, final byte[] keyBytes) throws Exception {
+ ColumnFamilyHandle columnFamilyHandle = columnFamilyNameHandleMap.get(cf);
+ if (columnFamilyHandle == null) {
+ return null;
+ }
+ return get(columnFamilyHandle, this.totalOrderReadOptions, keyBytes);
+ }
+
+ public void delete(String cf, final byte[] keyBytes) throws Exception {
+ ColumnFamilyHandle columnFamilyHandle = columnFamilyNameHandleMap.get(cf);
+ if (columnFamilyHandle == null) {
+ return;
+ }
+ delete(columnFamilyHandle, this.ableWalWriteOptions, keyBytes);
+ }
+
+ public void iterate(final String cf, BiConsumer<byte[], byte[]> biConsumer) throws RocksDBException {
+ if (!hold()) {
+ LOGGER.warn("RocksDBKvStore[path={}] has been shut down", dbPath);
+ return;
+ }
+ ColumnFamilyHandle columnFamilyHandle = columnFamilyNameHandleMap.get(cf);
+ if (columnFamilyHandle == null) {
+ return;
+ }
+ try (RocksIterator iterator = this.db.newIterator(columnFamilyHandle)) {
+ for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
+ biConsumer.accept(iterator.key(), iterator.value());
+ }
+ iterator.status();
+ }
+ }
+
public RocksIterator iterator() {
return this.db.newIterator(this.defaultCFHandle, this.totalOrderReadOptions);
}
- public RocksIterator forbiddenIterator() {
- return this.db.newIterator(this.forbiddenFamilyHandle, this.totalOrderReadOptions);
+ public ColumnFamilyHandle getOrCreateColumnFamily(String cf) throws RocksDBException {
+ if (!columnFamilyNameHandleMap.containsKey(cf)) {
+ if (readOnly) {
+ String errInfo = String.format("RocksDBKvStore[path=%s] is open as read-only", dbPath);
+ LOGGER.warn(errInfo);
+ throw new RocksDBException(errInfo);
+ }
+ synchronized (this) {
+ if (!columnFamilyNameHandleMap.containsKey(cf)) {
+ ColumnFamilyDescriptor columnFamilyDescriptor =
+ new ColumnFamilyDescriptor(cf.getBytes(CHARSET), columnFamilyOptions);
+ ColumnFamilyHandle columnFamilyHandle = db.createColumnFamily(columnFamilyDescriptor);
+ columnFamilyNameHandleMap.putIfAbsent(cf, columnFamilyHandle);
+ cfHandles.add(columnFamilyHandle);
+ }
+ }
+ }
+ return columnFamilyNameHandleMap.get(cf);
}
- public RocksIterator iterator(ReadOptions readOptions) {
- return this.db.newIterator(this.defaultCFHandle, readOptions);
+ public void addIfNotExists(List<byte[]> columnFamilyNames, byte[] byteArray) {
+ if (columnFamilyNames.stream().noneMatch(array -> Arrays.equals(array, byteArray))) {
+ columnFamilyNames.add(byteArray);
+ }
+ }
+
+ public static ConfigRocksDBStorage getStore(String path, boolean readOnly, CompressionType compressionType) {
+ return ConcurrentHashMapUtils.computeIfAbsent(STORE_MAP, path,
+ k -> new ConfigRocksDBStorage(path, readOnly, compressionType));
+ }
+
+ public static ConfigRocksDBStorage getStore(String path, boolean readOnly) {
+ return getStore(path, readOnly, null);
+ }
+
+ public static void shutdown(String path) {
+ ConfigRocksDBStorage kvStore = STORE_MAP.remove(path);
+ if (kvStore != null) {
+ kvStore.shutdown();
+ }
+ }
+
+ public static void destroy(String path) {
+ ConfigRocksDBStorage kvStore = STORE_MAP.remove(path);
+ if (kvStore != null) {
+ kvStore.shutdown();
+ kvStore.destroy();
+ }
}
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
index 48bc163..94899fc 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -192,8 +192,12 @@
return loadConsumerOffsets(path);
}
- ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true);
- configRocksDBStorage.start();
+ ConfigRocksDBStorage configRocksDBStorage = ConfigRocksDBStorage.getStore(path, true);
+ if (!configRocksDBStorage.start()) {
+ System.out.print("Failed to initialize ConfigRocksDBStorage.\n");
+ return null;
+ }
+
RocksIterator iterator = configRocksDBStorage.iterator();
try {
final Map<String, JSONObject> configMap = new HashMap<>();
@@ -208,10 +212,17 @@
configTable.put(name, jsonObject);
iterator.next();
}
- byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion();
- if (kvDataVersion != null) {
- configMap.put("dataVersion",
- JSONObject.parseObject(new String(kvDataVersion, DataConverter.CHARSET_UTF8)));
+
+ // Try to get data version
+ try {
+ byte[] kvDataVersion = configRocksDBStorage.get("kvDataVersion",
+ "kvDataVersionKey".getBytes(DataConverter.CHARSET_UTF8));
+ if (kvDataVersion != null) {
+ configMap.put("dataVersion",
+ JSONObject.parseObject(new String(kvDataVersion, DataConverter.CHARSET_UTF8)));
+ }
+ } catch (Exception e) {
+ // Ignore if data version is not available
}
if (ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS.equals(configType)) {
@@ -224,7 +235,7 @@
} catch (Exception e) {
System.out.print("Error occurred while converting RocksDB kv config to json, " + "configType=" + configType + ", " + e.getMessage() + "\n");
} finally {
- configRocksDBStorage.shutdown();
+ ConfigRocksDBStorage.shutdown(path);
}
return null;
}
@@ -284,8 +295,12 @@
}
private static Map<String, JSONObject> loadConsumerOffsets(String path) {
- ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true);
- configRocksDBStorage.start();
+ ConfigRocksDBStorage configRocksDBStorage = ConfigRocksDBStorage.getStore(path, true);
+ if (!configRocksDBStorage.start()) {
+ System.out.print("Failed to initialize ConfigRocksDBStorage for consumer offsets.\n");
+ return null;
+ }
+
RocksIterator iterator = configRocksDBStorage.iterator();
try {
final Map<String, JSONObject> configMap = new HashMap<>();
@@ -305,7 +320,7 @@
} catch (Exception e) {
System.out.print("Error occurred while converting RocksDB kv config to json, " + "configType=consumerOffsets, " + e.getMessage() + "\n");
} finally {
- configRocksDBStorage.shutdown();
+ ConfigRocksDBStorage.shutdown(path);
}
return null;
}