[ISSUE #9773] Implement Shared RocksDB Instance for Broker Configs (#9774)

diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
index dcf9061..04e745e 100644
--- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
+++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
@@ -35,17 +35,20 @@
 import org.apache.rocketmq.auth.config.AuthConfig;
 import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
-import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksDB;
 
 public class LocalAuthenticationMetadataProvider implements AuthenticationMetadataProvider {
 
+    private final static String AUTH_METADATA_COLUMN_FAMILY = new String(RocksDB.DEFAULT_COLUMN_FAMILY,
+        StandardCharsets.UTF_8);
+
     private ConfigRocksDBStorage storage;
 
     private LoadingCache<String, User> userCache;
 
     @Override
     public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
-        this.storage = new ConfigRocksDBStorage(authConfig.getAuthConfigPath() + File.separator + "users");
+        this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "users", false);
         if (!this.storage.start()) {
             throw new RuntimeException("Failed to load rocksdb for auth_user, please check whether it is occupied");
         }
@@ -72,7 +75,7 @@
         try {
             byte[] keyBytes = user.getUsername().getBytes(StandardCharsets.UTF_8);
             byte[] valueBytes = JSON.toJSONBytes(user);
-            this.storage.put(keyBytes, keyBytes.length, valueBytes);
+            this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
             this.storage.flushWAL();
             this.userCache.invalidate(user.getUsername());
         } catch (Exception e) {
@@ -84,7 +87,7 @@
     @Override
     public CompletableFuture<Void> deleteUser(String username) {
         try {
-            this.storage.delete(username.getBytes(StandardCharsets.UTF_8));
+            this.storage.delete(AUTH_METADATA_COLUMN_FAMILY, username.getBytes(StandardCharsets.UTF_8));
             this.storage.flushWAL();
             this.userCache.invalidate(username);
         } catch (Exception e) {
@@ -98,7 +101,7 @@
         try {
             byte[] keyBytes = user.getUsername().getBytes(StandardCharsets.UTF_8);
             byte[] valueBytes = JSON.toJSONBytes(user);
-            this.storage.put(keyBytes, keyBytes.length, valueBytes);
+            this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
             this.storage.flushWAL();
             this.userCache.invalidate(user.getUsername());
         } catch (Exception e) {
@@ -119,20 +122,21 @@
     @Override
     public CompletableFuture<List<User>> listUser(String filter) {
         List<User> result = new ArrayList<>();
-        try (RocksIterator iterator = this.storage.iterator()) {
-            iterator.seekToFirst();
-            while (iterator.isValid()) {
-                String username = new String(iterator.key(), StandardCharsets.UTF_8);
+        CompletableFuture<List<User>> future = new CompletableFuture<>();
+        try {
+            this.storage.iterate(AUTH_METADATA_COLUMN_FAMILY, (key, value) -> {
+                String username = new String(key, StandardCharsets.UTF_8);
                 if (StringUtils.isNotBlank(filter) && !username.contains(filter)) {
-                    iterator.next();
-                    continue;
+                    return;
                 }
-                User user = JSON.parseObject(new String(iterator.value(), StandardCharsets.UTF_8), User.class);
+                User user = JSON.parseObject(new String(value, StandardCharsets.UTF_8), User.class);
                 result.add(user);
-                iterator.next();
-            }
+            });
+        } catch (Exception e) {
+            future.completeExceptionally(e);
         }
-        return CompletableFuture.completedFuture(result);
+        future.complete(result);
+        return future;
     }
 
     @Override
@@ -154,7 +158,7 @@
         public User load(String username) {
             try {
                 byte[] keyBytes = username.getBytes(StandardCharsets.UTF_8);
-                byte[] valueBytes = storage.get(keyBytes);
+                byte[] valueBytes = storage.get(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
                 if (ArrayUtils.isEmpty(valueBytes)) {
                     return EMPTY_USER;
                 }
diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
index 54d8870..6db999b 100644
--- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
+++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
@@ -40,17 +40,20 @@
 import org.apache.rocketmq.auth.config.AuthConfig;
 import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
-import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksDB;
 
 public class LocalAuthorizationMetadataProvider implements AuthorizationMetadataProvider {
 
+    private final static String AUTH_METADATA_COLUMN_FAMILY = new String(RocksDB.DEFAULT_COLUMN_FAMILY,
+        StandardCharsets.UTF_8);
+
     private ConfigRocksDBStorage storage;
 
     private LoadingCache<String, Acl> aclCache;
 
     @Override
     public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
-        this.storage = new ConfigRocksDBStorage(authConfig.getAuthConfigPath() + File.separator + "acls");
+        this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "acls", false);
         if (!this.storage.start()) {
             throw new RuntimeException("Failed to load rocksdb for auth_acl, please check whether it is occupied.");
         }
@@ -77,7 +80,7 @@
             Subject subject = acl.getSubject();
             byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
             byte[] valueBytes = JSON.toJSONBytes(acl);
-            this.storage.put(keyBytes, keyBytes.length, valueBytes);
+            this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
             this.storage.flushWAL();
             this.aclCache.invalidate(subject.getSubjectKey());
         } catch (Exception e) {
@@ -90,7 +93,7 @@
     public CompletableFuture<Void> deleteAcl(Subject subject) {
         try {
             byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
-            this.storage.delete(keyBytes);
+            this.storage.delete(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
             this.storage.flushWAL();
             this.aclCache.invalidate(subject.getSubjectKey());
         } catch (Exception e) {
@@ -105,7 +108,7 @@
             Subject subject = acl.getSubject();
             byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
             byte[] valueBytes = JSON.toJSONBytes(acl);
-            this.storage.put(keyBytes, keyBytes.length, valueBytes);
+            this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
             this.storage.flushWAL();
             this.aclCache.invalidate(subject.getSubjectKey());
         } catch (Exception e) {
@@ -126,20 +129,18 @@
     @Override
     public CompletableFuture<List<Acl>> listAcl(String subjectFilter, String resourceFilter) {
         List<Acl> result = new ArrayList<>();
-        try (RocksIterator iterator = this.storage.iterator()) {
-            iterator.seekToFirst();
-            while (iterator.isValid()) {
-                String subjectKey = new String(iterator.key(), StandardCharsets.UTF_8);
+        CompletableFuture<List<Acl>> future = new CompletableFuture<>();
+        try {
+            this.storage.iterate(AUTH_METADATA_COLUMN_FAMILY, (key, value) -> {
+                String subjectKey = new String(key, StandardCharsets.UTF_8);
                 if (StringUtils.isNotBlank(subjectFilter) && !subjectKey.contains(subjectFilter)) {
-                    iterator.next();
-                    continue;
+                    return;
                 }
                 Subject subject = Subject.of(subjectKey);
-                Acl acl = JSON.parseObject(new String(iterator.value(), StandardCharsets.UTF_8), Acl.class);
+                Acl acl = JSON.parseObject(new String(value, StandardCharsets.UTF_8), Acl.class);
                 List<Policy> policies = acl.getPolicies();
                 if (!CollectionUtils.isNotEmpty(policies)) {
-                    iterator.next();
-                    continue;
+                    return;
                 }
                 Iterator<Policy> policyIterator = policies.iterator();
                 while (policyIterator.hasNext()) {
@@ -158,10 +159,12 @@
                 if (CollectionUtils.isNotEmpty(policies)) {
                     result.add(Acl.of(subject, policies));
                 }
-                iterator.next();
-            }
+            });
+        } catch (Exception e) {
+            future.completeExceptionally(e);
         }
-        return CompletableFuture.completedFuture(result);
+        future.complete(result);
+        return future;
     }
 
     @Override
@@ -185,7 +188,7 @@
                 byte[] keyBytes = subjectKey.getBytes(StandardCharsets.UTF_8);
                 Subject subject = Subject.of(subjectKey);
 
-                byte[] valueBytes = this.storage.get(keyBytes);
+                byte[] valueBytes = this.storage.get(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
                 if (ArrayUtils.isEmpty(valueBytes)) {
                     return EMPTY_ACL;
                 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java
similarity index 63%
rename from broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
rename to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java
index ee2d4e5..4ebdce1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker;
+package org.apache.rocketmq.broker.config.v1;
 
 import com.alibaba.fastjson.JSON;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.function.BiConsumer;
 import org.apache.commons.lang3.StringUtils;
@@ -27,12 +28,16 @@
 import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.rocksdb.CompressionType;
 import org.rocksdb.FlushOptions;
-import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
 import org.rocksdb.Statistics;
 import org.rocksdb.WriteBatch;
 
 public class RocksDBConfigManager {
     protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    public static final Charset CHARSET = StandardCharsets.UTF_8;
+
     public volatile boolean isStop = false;
     public ConfigRocksDBStorage configRocksDBStorage = null;
     private FlushOptions flushOptions = null;
@@ -42,21 +47,44 @@
     private final CompressionType compressionType;
     private DataVersion kvDataVersion = new DataVersion();
 
+    public static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME = "kvDataVersion".getBytes(CHARSET);
+    public static final byte[] KV_DATA_VERSION_KEY = "kvDataVersionKey".getBytes(CHARSET);
+
+    private final String defaultCF;
+    private final String versionCF;
+
+
+    public RocksDBConfigManager(String filePath, long memTableFlushInterval, CompressionType compressionType,
+        String defaultCF, String versionCF) {
+        this.filePath = filePath;
+        this.memTableFlushInterval = memTableFlushInterval;
+        this.compressionType = compressionType;
+        this.defaultCF = defaultCF;
+        this.versionCF = versionCF;
+    }
+
     public RocksDBConfigManager(String filePath, long memTableFlushInterval, CompressionType compressionType) {
         this.filePath = filePath;
         this.memTableFlushInterval = memTableFlushInterval;
         this.compressionType = compressionType;
+        this.defaultCF = new String(RocksDB.DEFAULT_COLUMN_FAMILY, CHARSET);
+        this.versionCF = new String(KV_DATA_VERSION_COLUMN_FAMILY_NAME, CHARSET);
+    }
+
+    public boolean init(boolean readOnly) {
+        this.isStop = false;
+        this.configRocksDBStorage = ConfigRocksDBStorage.getStore(filePath, readOnly, compressionType);
+        return this.configRocksDBStorage.start();
     }
 
     public boolean init() {
-        this.isStop = false;
-        this.configRocksDBStorage = new ConfigRocksDBStorage(filePath, compressionType);
-        return this.configRocksDBStorage.start();
+        return this.init(false);
     }
+
     public boolean loadDataVersion() {
         String currDataVersionString = null;
         try {
-            byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion();
+            byte[] dataVersion = this.configRocksDBStorage.get(versionCF, KV_DATA_VERSION_KEY);
             if (dataVersion != null && dataVersion.length > 0) {
                 currDataVersionString = new String(dataVersion, StandardCharsets.UTF_8);
             }
@@ -68,12 +96,11 @@
     }
 
     public boolean loadData(BiConsumer<byte[], byte[]> biConsumer) {
-        try (RocksIterator iterator = this.configRocksDBStorage.iterator()) {
-            iterator.seekToFirst();
-            while (iterator.isValid()) {
-                biConsumer.accept(iterator.key(), iterator.value());
-                iterator.next();
-            }
+        try {
+            configRocksDBStorage.iterate(this.defaultCF, biConsumer);
+        } catch (Exception e) {
+            BROKER_LOG.error("RocksDBConfigManager loadData failed", e);
+            return false;
         }
 
         this.flushOptions = new FlushOptions();
@@ -87,9 +114,7 @@
 
     public boolean stop() {
         this.isStop = true;
-        if (this.configRocksDBStorage != null) {
-            return this.configRocksDBStorage.shutdown();
-        }
+        ConfigRocksDBStorage.shutdown(filePath);
         if (this.flushOptions != null) {
             this.flushOptions.close();
         }
@@ -115,28 +140,38 @@
         }
     }
 
-    public void put(final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception {
-        this.configRocksDBStorage.put(keyBytes, keyLen, valueBytes);
+    public void put(final byte[] keyBytes, final byte[] valueBytes) throws Exception {
+        this.configRocksDBStorage.put(defaultCF, keyBytes, keyBytes.length, valueBytes);
+    }
+
+    public void put(String cf, String key, String value) throws Exception {
+        byte[] keyBytes = key.getBytes(CHARSET);
+        this.configRocksDBStorage.put(cf, keyBytes, keyBytes.length, value.getBytes(CHARSET));
+    }
+
+    public void put(String cf, final byte[] keyBytes, final byte[] valueBytes) throws Exception {
+        this.configRocksDBStorage.put(cf, keyBytes, keyBytes.length, valueBytes);
     }
 
     public void delete(final byte[] keyBytes) throws Exception {
-        this.configRocksDBStorage.delete(keyBytes);
+        this.configRocksDBStorage.delete(defaultCF, keyBytes);
     }
 
     public void updateKvDataVersion() throws Exception {
         kvDataVersion.nextVersion();
-        this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
+        this.configRocksDBStorage.put(versionCF, KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length,
+            JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
     }
 
     public DataVersion getKvDataVersion() {
         return kvDataVersion;
     }
 
-    public void updateForbidden(String key, String value) throws Exception {
-        this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
+    // batch operations
+    public void writeBatchPutOperation(WriteBatch writeBatch, final byte[] key, final byte[] value) throws RocksDBException {
+        configRocksDBStorage.writeBatchPutOperation(defaultCF, writeBatch, key, value);
     }
 
-
     public void batchPutWithWal(final WriteBatch batch) throws Exception {
         this.configRocksDBStorage.batchPutWithWal(batch);
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
index 963c504..2d015af 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
@@ -18,12 +18,13 @@
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
-import java.io.File;
-import java.util.Iterator;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -38,13 +39,37 @@
 
     protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
+    private static final String VERSION_COLUMN_FAMILY = "consumerOffsetVersion";
+    private static final String OFFSET_COLUMN_FAMILY = "consumerOffset";
+
     protected transient RocksDBConfigManager rocksDBConfigManager;
+    private final boolean useSingleRocksDBForAllConfigs;
+    private final String storePathRootDir;
+
+    public RocksDBConsumerOffsetManager(BrokerController brokerController, boolean useSingleRocksDB,
+        String storePathRootDir) {
+        super(brokerController);
+
+        this.useSingleRocksDBForAllConfigs = useSingleRocksDB;
+        this.storePathRootDir = StringUtils.isBlank(storePathRootDir) ?
+            brokerController.getMessageStoreConfig().getStorePathRootDir() : storePathRootDir;
+
+        long flushInterval = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+        CompressionType compressionType =
+            CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+        String rocksDBPath = rocksdbConfigFilePath(storePathRootDir, useSingleRocksDB);
+
+        this.rocksDBConfigManager = useSingleRocksDB ? new RocksDBConfigManager(rocksDBPath, flushInterval,
+            compressionType, OFFSET_COLUMN_FAMILY, VERSION_COLUMN_FAMILY) : new RocksDBConfigManager(rocksDBPath,
+            flushInterval, compressionType);
+    }
+
+    public RocksDBConsumerOffsetManager(BrokerController brokerController, boolean useSingleRocksDBForAllConfigs) {
+        this(brokerController, useSingleRocksDBForAllConfigs, null);
+    }
 
     public RocksDBConsumerOffsetManager(BrokerController brokerController) {
-        super(brokerController);
-        this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
-            CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
-
+        this(brokerController, brokerController.getBrokerConfig().isUseSingleRocksDBForAllConfigs(), null);
     }
 
     @Override
@@ -55,7 +80,9 @@
         if (!loadDataVersion() || !loadConsumerOffset()) {
             return false;
         }
-
+        if (useSingleRocksDBForAllConfigs) {
+            migrateFromSeparateRocksDBs();
+        }
         return true;
     }
 
@@ -112,19 +139,27 @@
         log.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
     }
 
-    public String rocksdbConfigFilePath() {
-        return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
+    public String rocksdbConfigFilePath(String storePathRootDir, boolean useSingleRocksDBForAllConfigs) {
+        if (StringUtils.isBlank(storePathRootDir)) {
+            storePathRootDir = brokerController.getMessageStoreConfig().getStorePathRootDir();
+        }
+        Path rootPath = Paths.get(storePathRootDir);
+        if (useSingleRocksDBForAllConfigs) {
+            return rootPath.resolve("config").resolve("metadata").toString();
+        }
+        return rootPath.resolve("config").resolve("consumerOffsets").toString();
+    }
+
+    @Override
+    public String configFilePath() {
+        return BrokerPathConfigHelper.getConsumerOffsetPath(this.storePathRootDir);
     }
 
     @Override
     public synchronized void persist() {
-        WriteBatch writeBatch = new WriteBatch();
-        try {
-            Iterator<Entry<String, ConcurrentMap<Integer, Long>>> iterator = this.offsetTable.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Entry<String, ConcurrentMap<Integer, Long>> entry = iterator.next();
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            for (Entry<String, ConcurrentMap<Integer, Long>> entry : this.offsetTable.entrySet()) {
                 putWriteBatch(writeBatch, entry.getKey(), entry.getValue());
-
                 if (writeBatch.getDataSize() >= 4 * 1024) {
                     this.rocksDBConfigManager.batchPutWithWal(writeBatch);
                 }
@@ -133,8 +168,6 @@
             this.rocksDBConfigManager.flushWAL();
         } catch (Exception e) {
             log.error("consumer offset persist Failed", e);
-        } finally {
-            writeBatch.close();
         }
     }
 
@@ -148,7 +181,7 @@
         RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
         wrapper.setOffsetTable(offsetMap);
         byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
-        writeBatch.put(keyBytes, valueBytes);
+        rocksDBConfigManager.writeBatchPutOperation(writeBatch, keyBytes, valueBytes);
     }
 
     @Override
@@ -161,6 +194,7 @@
         return rocksDBConfigManager.getKvDataVersion();
     }
 
+    @Override
     public void updateDataVersion() {
         try {
             rocksDBConfigManager.updateKvDataVersion();
@@ -169,4 +203,98 @@
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * Migrate data from separate RocksDB instances to the unified RocksDB when useSingleRocksDBForAllConfigs is
+     * enabled.
+     * This method will only be called when switching from separate RocksDB mode to unified mode.
+     * It opens the separate RocksDB in read-only mode, compares versions, and imports data if needed.
+     */
+    private void migrateFromSeparateRocksDBs() {
+        String separateRocksDBPath = rocksdbConfigFilePath(this.storePathRootDir, false);
+
+        // Check if separate RocksDB exists
+        if (!UtilAll.isPathExists(separateRocksDBPath)) {
+            log.info("Separate RocksDB for consumer offsets does not exist at {}, no migration needed",
+                separateRocksDBPath);
+            return;
+        }
+
+        log.info("Starting migration from separate RocksDB at {} to unified RocksDB", separateRocksDBPath);
+
+        // Open separate RocksDB in read-only mode
+        RocksDBConfigManager separateRocksDBConfigManager = null;
+        try {
+            long memTableFlushIntervalMs = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+            org.rocksdb.CompressionType compressionType =
+                org.rocksdb.CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+
+            separateRocksDBConfigManager = new RocksDBConfigManager(separateRocksDBPath, memTableFlushIntervalMs,
+                compressionType);
+
+            // Initialize in read-only mode
+            if (!separateRocksDBConfigManager.init(true)) {
+                log.error("Failed to initialize separate RocksDB in read-only mode");
+                return;
+            }
+
+            // Load data version from separate RocksDB
+            if (!separateRocksDBConfigManager.loadDataVersion()) {
+                log.error("Failed to load data version from separate RocksDB");
+                return;
+            }
+
+            DataVersion separateDataVersion = separateRocksDBConfigManager.getKvDataVersion();
+            DataVersion unifiedDataVersion = this.getDataVersion();
+
+            log.info("Comparing data versions - Separate: {}, Unified: {}", separateDataVersion, unifiedDataVersion);
+
+            // Compare versions and import if separate version is newer
+            if (separateDataVersion.getCounter().get() > unifiedDataVersion.getCounter().get()) {
+                log.info("Separate RocksDB has newer data, importing...");
+
+                // Load consumer offsets from separate RocksDB
+                if (separateRocksDBConfigManager.loadData(this::importConsumerOffset)) {
+                    log.info("Successfully imported consumer offsets from separate RocksDB");
+
+                    // Update unified data version to be newer than separate one
+                    this.getDataVersion().assignNewOne(separateDataVersion);
+                    this.getDataVersion().nextVersion(); // Make it one version higher
+                    updateDataVersion();
+
+                    log.info("Updated unified data version to {}", this.getDataVersion());
+                } else {
+                    log.error("Failed to import consumer offsets from separate RocksDB");
+                }
+            } else {
+                log.info("Unified RocksDB is already up-to-date, no migration needed");
+            }
+        } catch (Exception e) {
+            log.error("Error during migration from separate RocksDB", e);
+        } finally {
+            // Clean up resources
+            if (separateRocksDBConfigManager != null) {
+                try {
+                    separateRocksDBConfigManager.stop();
+                } catch (Exception e) {
+                    log.warn("Error stopping separate RocksDB config manager", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Import a consumer offset from the separate RocksDB during migration
+     *
+     * @param key  The topic@group name bytes
+     * @param body The consumer offset data bytes
+     */
+    private void importConsumerOffset(final byte[] key, final byte[] body) {
+        try {
+            decodeOffset(key, body);
+            this.rocksDBConfigManager.put(key, body);
+        } catch (Exception e) {
+            log.error("Error importing consumer offset", e);
+        }
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index f7e0de9..f6ae3a3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -19,30 +19,57 @@
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
-import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.rocksdb.CompressionType;
-import org.rocksdb.RocksIterator;
 
 public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
 
     protected transient RocksDBConfigManager rocksDBConfigManager;
 
-    public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
+    private static final String VERSION_COLUMN_FAMILY = "subscriptionGroupVersion";
+    private static final String GROUP_COLUMN_FAMILY = "subscriptionGroup";
+    private static final String FORBIDDEN_COLUMN_FAMILY_NAME = "forbidden";
+
+    private final boolean useSingleRocksDBForAllConfigs;
+    private final String storePathRootDir;
+
+    public RocksDBSubscriptionGroupManager(BrokerController brokerController, boolean useSingleRocksDB,
+        String storePathRootDir) {
         super(brokerController, false);
-        this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
-            CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
+
+        this.useSingleRocksDBForAllConfigs = useSingleRocksDB;
+        this.storePathRootDir = StringUtils.isBlank(storePathRootDir) ?
+            brokerController.getMessageStoreConfig().getStorePathRootDir() : storePathRootDir;
+
+        long flushInterval = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+        CompressionType compressionType =
+            CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+        String rocksDBPath = rocksdbConfigFilePath(storePathRootDir, useSingleRocksDB);
+
+        this.rocksDBConfigManager = useSingleRocksDB ? new RocksDBConfigManager(rocksDBPath, flushInterval,
+            compressionType, GROUP_COLUMN_FAMILY, VERSION_COLUMN_FAMILY) : new RocksDBConfigManager(rocksDBPath,
+            flushInterval, compressionType);
+    }
+
+    public RocksDBSubscriptionGroupManager(BrokerController brokerController, boolean useSingleRocksDBForAllConfigs) {
+        this(brokerController, useSingleRocksDBForAllConfigs, null);
+    }
+
+    public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
+        this(brokerController, brokerController.getBrokerConfig().isUseSingleRocksDBForAllConfigs(), null);
     }
 
     @Override
@@ -53,6 +80,9 @@
         if (!loadDataVersion() || !loadSubscriptionGroupAndForbidden()) {
             return false;
         }
+        if (useSingleRocksDBForAllConfigs) {
+            migrateFromSeparateRocksDBs();
+        }
         this.init();
         return true;
     }
@@ -68,14 +98,13 @@
     }
 
     public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {
-        try (RocksIterator iterator = this.rocksDBConfigManager.configRocksDBStorage.forbiddenIterator()) {
-            iterator.seekToFirst();
-            while (iterator.isValid()) {
-                biConsumer.accept(iterator.key(), iterator.value());
-                iterator.next();
-            }
+        try {
+            this.rocksDBConfigManager.configRocksDBStorage.iterate(FORBIDDEN_COLUMN_FAMILY_NAME, biConsumer);
+            return true;
+        } catch (Exception e) {
+            log.error("loadForbidden exception", e);
         }
-        return true;
+        return false;
     }
 
     private boolean merge() {
@@ -102,7 +131,8 @@
             final ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = this.getForbiddenTable();
             for (Map.Entry<String, ConcurrentMap<String, Integer>> entry : forbiddenTable.entrySet()) {
                 try {
-                    this.rocksDBConfigManager.updateForbidden(entry.getKey(), JSON.toJSONString(entry.getValue()));
+                    this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, entry.getKey(),
+                        JSON.toJSONString(entry.getValue()));
                     log.info("import forbidden config to rocksdb, group={}", entry.getValue());
                 } catch (Exception e) {
                     log.error("import forbidden config to rocksdb failed, group={}", entry.getValue());
@@ -131,9 +161,9 @@
         SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig);
 
         try {
-            byte[] keyBytes = groupName.getBytes(DataConverter.CHARSET_UTF8);
+            byte[] keyBytes = groupName.getBytes(RocksDBConfigManager.CHARSET);
             byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible);
-            this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes);
+            this.rocksDBConfigManager.put(keyBytes, valueBytes);
         } catch (Exception e) {
             log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString());
         }
@@ -146,9 +176,9 @@
         SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.putIfAbsent(groupName, subscriptionGroupConfig);
         if (oldConfig == null) {
             try {
-                byte[] keyBytes = groupName.getBytes(DataConverter.CHARSET_UTF8);
+                byte[] keyBytes = groupName.getBytes(RocksDBConfigManager.CHARSET);
                 byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible);
-                this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes);
+                this.rocksDBConfigManager.put(keyBytes, valueBytes);
             } catch (Exception e) {
                 log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString());
             }
@@ -160,7 +190,7 @@
     protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) {
         SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.remove(groupName);
         try {
-            this.rocksDBConfigManager.delete(groupName.getBytes(DataConverter.CHARSET_UTF8));
+            this.rocksDBConfigManager.delete(groupName.getBytes(RocksDBConfigManager.CHARSET));
         } catch (Exception e) {
             log.error("kv delete sub Failed, {}", subscriptionGroupConfig.toString());
         }
@@ -169,7 +199,7 @@
 
 
     protected void decodeSubscriptionGroup(byte[] key, byte[] body) {
-        String groupName = new String(key, DataConverter.CHARSET_UTF8);
+        String groupName = new String(key, RocksDBConfigManager.CHARSET);
         SubscriptionGroupConfig subscriptionGroupConfig = JSON.parseObject(body, SubscriptionGroupConfig.class);
 
         this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig);
@@ -188,8 +218,20 @@
         super.persist();
     }
 
-    public String rocksdbConfigFilePath() {
-        return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator;
+    public String rocksdbConfigFilePath(String storePathRootDir, boolean useSingleRocksDBForAllConfigs) {
+        if (StringUtils.isBlank(storePathRootDir)) {
+            storePathRootDir = brokerController.getMessageStoreConfig().getStorePathRootDir();
+        }
+        Path rootPath = Paths.get(storePathRootDir);
+        if (useSingleRocksDBForAllConfigs) {
+            return rootPath.resolve("config").resolve("metadata").toString();
+        }
+        return rootPath.resolve("config").resolve("subscriptionGroups").toString();
+    }
+
+    @Override
+    public String configFilePath() {
+        return BrokerPathConfigHelper.getSubscriptionGroupPath(this.storePathRootDir);
     }
 
     @Override
@@ -208,8 +250,8 @@
     }
 
     protected void decodeForbidden(byte[] key, byte[] body) {
-        String forbiddenGroupName = new String(key, DataConverter.CHARSET_UTF8);
-        JSONObject jsonObject = JSON.parseObject(new String(body, DataConverter.CHARSET_UTF8));
+        String forbiddenGroupName = new String(key, RocksDBConfigManager.CHARSET);
+        JSONObject jsonObject = JSON.parseObject(new String(body, RocksDBConfigManager.CHARSET));
         Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
         ConcurrentMap<String, Integer> forbiddenGroup = new ConcurrentHashMap<>(entries.size());
         for (Map.Entry<String, Object> entry : entries) {
@@ -223,7 +265,8 @@
     public void updateForbidden(String group, String topic, int forbiddenIndex, boolean setOrClear) {
         try {
             super.updateForbidden(group, topic, forbiddenIndex, setOrClear);
-            this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
+            this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, group,
+                JSON.toJSONString(this.getForbiddenTable().get(group)));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -233,7 +276,8 @@
     public void setForbidden(String group, String topic, int forbiddenIndex) {
         try {
             super.setForbidden(group, topic, forbiddenIndex);
-            this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
+            this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, group,
+                JSON.toJSONString(this.getForbiddenTable().get(group)));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -243,9 +287,131 @@
     public void clearForbidden(String group, String topic, int forbiddenIndex) {
         try {
             super.clearForbidden(group, topic, forbiddenIndex);
-            this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
+            this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, group,
+                JSON.toJSONString(this.getForbiddenTable().get(group)));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * Migrate data from separate RocksDB instances to the unified RocksDB when useSingleRocksDBForAllConfigs is
+     * enabled.
+     * This method will only be called when switching from separate RocksDB mode to unified mode.
+     * It opens the separate RocksDB in read-only mode, compares versions, and imports data if needed.
+     */
+    private void migrateFromSeparateRocksDBs() {
+        String separateRocksDBPath = rocksdbConfigFilePath(this.storePathRootDir, false);
+
+        // Check if separate RocksDB exists
+        if (!org.apache.rocketmq.common.UtilAll.isPathExists(separateRocksDBPath)) {
+            log.info("Separate RocksDB for subscription groups does not exist at {}, no migration needed",
+                separateRocksDBPath);
+            return;
+        }
+
+        log.info("Starting migration from separate RocksDB at {} to unified RocksDB", separateRocksDBPath);
+
+        // Open separate RocksDB in read-only mode
+        RocksDBConfigManager separateRocksDBConfigManager = null;
+        try {
+            long memTableFlushIntervalMs = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+            org.rocksdb.CompressionType compressionType =
+                org.rocksdb.CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+
+            separateRocksDBConfigManager = new RocksDBConfigManager(separateRocksDBPath, memTableFlushIntervalMs,
+                compressionType);
+
+            // Initialize in read-only mode
+            if (!separateRocksDBConfigManager.init(true)) {
+                log.error("Failed to initialize separate RocksDB in read-only mode");
+                return;
+            }
+
+            // Load data version from separate RocksDB
+            if (!separateRocksDBConfigManager.loadDataVersion()) {
+                log.error("Failed to load data version from separate RocksDB");
+                return;
+            }
+
+            org.apache.rocketmq.remoting.protocol.DataVersion separateDataVersion =
+                separateRocksDBConfigManager.getKvDataVersion();
+            org.apache.rocketmq.remoting.protocol.DataVersion unifiedDataVersion = this.getDataVersion();
+
+            log.info("Comparing data versions - Separate: {}, Unified: {}", separateDataVersion, unifiedDataVersion);
+
+            // Compare versions and import if separate version is newer
+            if (separateDataVersion.getCounter().get() > unifiedDataVersion.getCounter().get()) {
+                log.info("Separate RocksDB has newer data, importing...");
+
+                // Load subscription groups from separate RocksDB
+                boolean success = separateRocksDBConfigManager.loadData(this::importSubscriptionGroup);
+                if (success) {
+                    // Load forbidden data directly using the storage
+                    try {
+                        separateRocksDBConfigManager.configRocksDBStorage.iterate(FORBIDDEN_COLUMN_FAMILY_NAME,
+                            this::importForbidden);
+                        log.info("Successfully imported subscription groups and forbidden data from separate RocksDB");
+
+                        // Update unified data version to be newer than separate one
+                        this.getDataVersion().assignNewOne(separateDataVersion);
+                        this.getDataVersion().nextVersion(); // Make it one version higher
+                        updateDataVersion();
+
+                        log.info("Updated unified data version to {}", this.getDataVersion());
+                    } catch (Exception e) {
+                        log.error("Failed to import forbidden data from separate RocksDB", e);
+                        success = false;
+                    }
+                }
+
+                if (!success) {
+                    log.error("Failed to import subscription groups or forbidden data from separate RocksDB");
+                }
+            } else {
+                log.info("Unified RocksDB is already up-to-date, no migration needed");
+            }
+        } catch (Exception e) {
+            log.error("Error during migration from separate RocksDB", e);
+        } finally {
+            // Clean up resources
+            if (separateRocksDBConfigManager != null) {
+                try {
+                    separateRocksDBConfigManager.stop();
+                } catch (Exception e) {
+                    log.warn("Error stopping separate RocksDB config manager", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Import a subscription group from the separate RocksDB during migration
+     *
+     * @param key  The group name bytes
+     * @param body The subscription group data bytes
+     */
+    private void importSubscriptionGroup(byte[] key, byte[] body) {
+        try {
+            decodeSubscriptionGroup(key, body);
+            this.rocksDBConfigManager.put(key, body);
+        } catch (Exception e) {
+            log.error("Error importing subscription group", e);
+        }
+    }
+
+    /**
+     * Import forbidden data from the separate RocksDB during migration
+     *
+     * @param key  The group name bytes
+     * @param body The forbidden data bytes
+     */
+    private void importForbidden(byte[] key, byte[] body) {
+        try {
+            decodeForbidden(key, body);
+            this.rocksDBConfigManager.put(FORBIDDEN_COLUMN_FAMILY_NAME, key, body);
+        } catch (Exception e) {
+            log.error("Error importing forbidden data", e);
+        }
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index d64f808..4a8d124 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -18,11 +18,13 @@
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
-import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
@@ -31,13 +33,37 @@
 import org.rocksdb.CompressionType;
 
 public class RocksDBTopicConfigManager extends TopicConfigManager {
+    private static final String VERSION_COLUMN_FAMILY = "topicVersion";
+    private static final String TOPIC_COLUMN_FAMILY = "topic";
 
     protected transient RocksDBConfigManager rocksDBConfigManager;
+    private final boolean useSingleRocksDBForAllConfigs;
+    private final String storePathRootDir;
+
+    public RocksDBTopicConfigManager(BrokerController brokerController, boolean useSingleRocksDB,
+        String storePathRootDir) {
+        super(brokerController, false);
+
+        this.useSingleRocksDBForAllConfigs = useSingleRocksDB;
+        this.storePathRootDir = StringUtils.isBlank(storePathRootDir) ?
+            brokerController.getMessageStoreConfig().getStorePathRootDir() : storePathRootDir;
+
+        long flushInterval = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+        CompressionType compressionType =
+            CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+        String rocksDBPath = rocksdbConfigFilePath(storePathRootDir, useSingleRocksDB);
+
+        this.rocksDBConfigManager = useSingleRocksDB ? new RocksDBConfigManager(rocksDBPath, flushInterval,
+            compressionType, TOPIC_COLUMN_FAMILY, VERSION_COLUMN_FAMILY) : new RocksDBConfigManager(rocksDBPath,
+            flushInterval, compressionType);
+    }
+
+    public RocksDBTopicConfigManager(BrokerController brokerController, boolean useSingleRocksDBForAllConfigs) {
+        this(brokerController, useSingleRocksDBForAllConfigs, null);
+    }
 
     public RocksDBTopicConfigManager(BrokerController brokerController) {
-        super(brokerController, false);
-        this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
-            CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
+        this(brokerController, brokerController.getBrokerConfig().isUseSingleRocksDBForAllConfigs(), null);
     }
 
     @Override
@@ -48,6 +74,9 @@
         if (!loadDataVersion() || !loadTopicConfig()) {
             return false;
         }
+        if (useSingleRocksDBForAllConfigs) {
+            migrateFromSeparateRocksDBs();
+        }
         this.init();
         return true;
     }
@@ -114,7 +143,7 @@
         try {
             byte[] keyBytes = topicName.getBytes(DataConverter.CHARSET_UTF8);
             byte[] valueBytes = JSON.toJSONBytes(topicConfig, SerializerFeature.BrowserCompatible);
-            this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes);
+            this.rocksDBConfigManager.put(keyBytes, valueBytes);
         } catch (Exception e) {
             log.error("kv put topic Failed, {}", topicConfig.toString(), e);
         }
@@ -144,10 +173,21 @@
         super.persist();
     }
 
-    public String rocksdbConfigFilePath() {
-        return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "topics" + File.separator;
+    public String rocksdbConfigFilePath(String storePathRootDir, boolean useSingleRocksDBForAllConfigs) {
+        if (StringUtils.isBlank(storePathRootDir)) {
+            storePathRootDir = brokerController.getMessageStoreConfig().getStorePathRootDir();
+        }
+        Path rootPath = Paths.get(storePathRootDir);
+        if (useSingleRocksDBForAllConfigs) {
+            return rootPath.resolve("config").resolve("metadata").toString();
+        }
+        return rootPath.resolve("config").resolve("topics").toString();
     }
 
+    @Override
+    public String configFilePath() {
+        return BrokerPathConfigHelper.getTopicConfigPath(this.storePathRootDir);
+    }
 
     @Override
     public DataVersion getDataVersion() {
@@ -163,4 +203,94 @@
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * Migrate data from separate RocksDB instances to the unified RocksDB when useSingleRocksDBForAllConfigs is
+     * enabled.
+     * This method will only be called when switching from separate RocksDB mode to unified mode.
+     * It opens the separate RocksDB in read-only mode, compares versions, and imports data if needed.
+     */
+    private void migrateFromSeparateRocksDBs() {
+        String separateRocksDBPath = rocksdbConfigFilePath(this.storePathRootDir, false);
+
+        // Check if separate RocksDB exists
+        if (!UtilAll.isPathExists(separateRocksDBPath)) {
+            log.info("Separate RocksDB for topics does not exist at {}, no migration needed", separateRocksDBPath);
+            return;
+        }
+
+        log.info("Starting migration from separate RocksDB at {} to unified RocksDB", separateRocksDBPath);
+
+        // Open separate RocksDB in read-only mode
+        RocksDBConfigManager separateRocksDBConfigManager = null;
+        try {
+            long memTableFlushIntervalMs = brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs();
+            org.rocksdb.CompressionType compressionType =
+                org.rocksdb.CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType());
+
+            separateRocksDBConfigManager = new RocksDBConfigManager(separateRocksDBPath, memTableFlushIntervalMs,
+                compressionType);
+
+            // Initialize in read-only mode
+            if (!separateRocksDBConfigManager.init(true)) {
+                log.error("Failed to initialize separate RocksDB in read-only mode");
+                return;
+            }
+
+            // Load data version from separate RocksDB
+            if (!separateRocksDBConfigManager.loadDataVersion()) {
+                log.error("Failed to load data version from separate RocksDB");
+                return;
+            }
+
+            DataVersion separateDataVersion = separateRocksDBConfigManager.getKvDataVersion();
+            DataVersion unifiedDataVersion = this.getDataVersion();
+
+            log.info("Comparing data versions - Separate: {}, Unified: {}", separateDataVersion, unifiedDataVersion);
+
+            // Compare versions and import if separate version is newer
+            if (separateDataVersion.getCounter().get() > unifiedDataVersion.getCounter().get()) {
+                log.info("Separate RocksDB has newer data, importing...");
+
+                // Load topic configs from separate RocksDB
+                if (separateRocksDBConfigManager.loadData(this::importTopicConfig)) {
+                    log.info("Successfully imported topic configs from separate RocksDB");
+
+                    this.getDataVersion().assignNewOne(separateDataVersion);
+                    this.getDataVersion().nextVersion(); // Make it one version higher
+                    updateDataVersion();
+                    log.info("Updated unified data version to {}", this.getDataVersion());
+                } else {
+                    log.error("Failed to import topic configs from separate RocksDB");
+                }
+            } else {
+                log.info("Unified RocksDB is already up-to-date, no migration needed");
+            }
+        } catch (Exception e) {
+            log.error("Error during migration from separate RocksDB", e);
+        } finally {
+            if (separateRocksDBConfigManager != null) {
+                try {
+                    separateRocksDBConfigManager.stop();
+                } catch (Exception e) {
+                    log.warn("Error stopping separate RocksDB config manager", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Import a topic config from the separate RocksDB during migration
+     *
+     * @param key  The topic name bytes
+     * @param body The topic config data bytes
+     */
+    private void importTopicConfig(byte[] key, byte[] body) {
+        try {
+            decodeTopicConfig(key, body);
+            this.rocksDBConfigManager.put(key, body);
+        } catch (Exception e) {
+            log.error("Error importing topic config", e);
+        }
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index e1e1cb4..3eee9fc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.offset;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -26,9 +27,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Strings;
-
 import java.util.function.Function;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -217,8 +215,7 @@
             }
         }
         if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep() == 0) {
-            long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            dataVersion.nextVersion(stateMachineVersion);
+            updateDataVersion();
         }
     }
 
@@ -378,6 +375,12 @@
         return dataVersion;
     }
 
+    public void updateDataVersion() {
+        long stateMachineVersion = brokerController.getMessageStore() != null ?
+            brokerController.getMessageStore().getStateMachineVersion() : 0;
+        dataVersion.nextVersion(stateMachineVersion);
+    }
+
     public void setDataVersion(DataVersion dataVersion) {
         this.dataVersion = dataVersion;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index ed46dfd..f59651f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -676,7 +676,7 @@
     public String encode(final boolean prettyFormat) {
         TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
         topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
-        topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+        topicConfigSerializeWrapper.setDataVersion(getDataVersion());
         return topicConfigSerializeWrapper.toJson(prettyFormat);
     }
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManagerMigrationTest.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManagerMigrationTest.java
new file mode 100644
index 0000000..697316e
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManagerMigrationTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v1;
+
+import java.io.File;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RocksDBConsumerOffsetManagerMigrationTest {
+
+    private static final String TEST_GROUP = "TestGroup";
+    private static final String TEST_TOPIC = "TestTopic";
+    private static final String TEST_KEY = TEST_TOPIC + "@" + TEST_GROUP;
+    
+    private BrokerController brokerController;
+    private String storePath;
+    private String separateRocksDBPath;
+    private String unifiedRocksDBPath;
+
+    @Before
+    public void init() {
+        brokerController = Mockito.mock(BrokerController.class);
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        storePath = System.getProperty("java.io.tmpdir") + File.separator + "rocketmq-test-" + System.currentTimeMillis();
+        messageStoreConfig.setStorePathRootDir(storePath);
+        Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setConsumerOffsetUpdateVersionStep(1);
+        Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        
+        separateRocksDBPath = storePath + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
+        unifiedRocksDBPath = storePath + File.separator + "config" + File.separator + "metadata" + File.separator;
+        
+        // Create directories
+        UtilAll.ensureDirOK(separateRocksDBPath);
+        UtilAll.ensureDirOK(unifiedRocksDBPath);
+    }
+
+    @After
+    public void destroy() {
+        // Clean up test directories
+        UtilAll.deleteFile(new File(storePath));
+    }
+
+    @Test
+    public void testMigrationFromSeparateToUnifiedRocksDB() {
+        
+        // First, create data in separate RocksDB mode
+        RocksDBConsumerOffsetManager separateManager = new RocksDBConsumerOffsetManager(brokerController, false);
+        separateManager.load();
+        
+        // Add some consumer offsets
+        separateManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 0, 100L);
+        separateManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 1, 200L);
+        separateManager.persist();
+        separateManager.stop();
+        
+        // Now create unified RocksDB manager which should migrate data
+        RocksDBConsumerOffsetManager unifiedManager = new RocksDBConsumerOffsetManager(brokerController, true);
+        boolean loaded = unifiedManager.load();
+        Assert.assertTrue("Unified manager should load successfully", loaded);
+        
+        // Verify that data was migrated
+        ConcurrentMap<Integer, Long> migratedOffsetMap = unifiedManager.getOffsetTable().get(TEST_KEY);
+        Assert.assertNotNull("Consumer offset should be migrated", migratedOffsetMap);
+        Assert.assertEquals("Offset for queue 0 should match", Long.valueOf(100L), migratedOffsetMap.get(0));
+        Assert.assertEquals("Offset for queue 1 should match", Long.valueOf(200L), migratedOffsetMap.get(1));
+
+        unifiedManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 0, 300L);
+        unifiedManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 1, 400L);
+        unifiedManager.persist();
+        unifiedManager.stop();
+
+        // reload unified RocksDB manager which should not migrate data
+        unifiedManager = new RocksDBConsumerOffsetManager(brokerController, true);
+        unifiedManager.load();
+
+        // Verify that data was new
+        migratedOffsetMap = unifiedManager.getOffsetTable().get(TEST_KEY);
+        Assert.assertEquals("Offset for queue 0 should match", Long.valueOf(300L), migratedOffsetMap.get(0));
+        Assert.assertEquals("Offset for queue 1 should match", Long.valueOf(400L), migratedOffsetMap.get(1));
+        unifiedManager.stop();
+    }
+
+    @Test
+    public void testMigrationWithNoSeparateRocksDB() {
+        
+        // Ensure separate RocksDB doesn't exist
+        UtilAll.deleteFile(new File(separateRocksDBPath));
+        
+        // Create unified RocksDB manager - should not fail even without separate DB
+        RocksDBConsumerOffsetManager unifiedManager = new RocksDBConsumerOffsetManager(brokerController, true);
+        boolean loaded = unifiedManager.load();
+        Assert.assertTrue("Unified manager should load successfully even without separate DB", loaded);
+        
+        unifiedManager.stop();
+    }
+
+    @Test
+    public void testNoMigrationWhenDisabled() {
+        
+        // Create data in separate RocksDB mode
+        RocksDBConsumerOffsetManager separateManager = new RocksDBConsumerOffsetManager(brokerController, false);
+        separateManager.load();
+
+        separateManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 0, 100L);
+        separateManager.commitOffset("client", TEST_GROUP, TEST_TOPIC, 1, 200L);
+        separateManager.persist();
+        separateManager.stop();
+
+        long version = separateManager.getDataVersion().getCounter().get();
+        Assert.assertEquals(2, version);
+
+        // Create another separate manager - should not trigger migration
+        RocksDBConsumerOffsetManager anotherSeparateManager = new RocksDBConsumerOffsetManager(brokerController, false);
+        boolean loaded = anotherSeparateManager.load();
+        Assert.assertTrue("Separate manager should load successfully", loaded);
+
+        anotherSeparateManager.loadDataVersion();
+        Assert.assertEquals(version, anotherSeparateManager.getDataVersion().getCounter().get());
+        anotherSeparateManager.stop();
+    }
+}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManagerMigrationTest.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManagerMigrationTest.java
new file mode 100644
index 0000000..cb23c9f
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManagerMigrationTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v1;
+
+import java.io.File;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RocksDBSubscriptionGroupManagerMigrationTest {
+
+    private static final String TEST_GROUP = "TestGroup";
+    
+    private BrokerController brokerController;
+    private String storePath;
+    private String separateRocksDBPath;
+    private String unifiedRocksDBPath;
+
+    @Before
+    public void init() {
+        
+        brokerController = Mockito.mock(BrokerController.class);
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        storePath = System.getProperty("java.io.tmpdir") + File.separator + "rocketmq-test-" + System.currentTimeMillis();
+        messageStoreConfig.setStorePathRootDir(storePath);
+        Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        
+        separateRocksDBPath = storePath + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator;
+        unifiedRocksDBPath = storePath + File.separator + "config" + File.separator + "metadata" + File.separator;
+        
+        // Create directories
+        UtilAll.ensureDirOK(separateRocksDBPath);
+        UtilAll.ensureDirOK(unifiedRocksDBPath);
+    }
+
+    @After
+    public void destroy() {
+        
+        // Clean up test directories
+        UtilAll.deleteFile(new File(storePath));
+    }
+
+    @Test
+    public void testMigrationFromSeparateToUnifiedRocksDB() {
+        
+        // First, create data in separate RocksDB mode
+        RocksDBSubscriptionGroupManager separateManager = new RocksDBSubscriptionGroupManager(brokerController, false);
+        separateManager.load();
+        
+        // Add some subscription groups
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(TEST_GROUP);
+        groupConfig.setConsumeEnable(true);
+        groupConfig.setConsumeFromMinEnable(true);
+        groupConfig.setRetryMaxTimes(3);
+        separateManager.updateSubscriptionGroupConfig(groupConfig);
+        separateManager.persist();
+        separateManager.stop();
+
+        {
+            // Now create unified RocksDB manager which should migrate data
+            RocksDBSubscriptionGroupManager unifiedManager = new RocksDBSubscriptionGroupManager(brokerController, true);
+            boolean loaded = unifiedManager.load();
+            Assert.assertTrue("Unified manager should load successfully", loaded);
+
+            // Verify that data was migrated
+            SubscriptionGroupConfig migratedConfig = unifiedManager.findSubscriptionGroupConfig(TEST_GROUP);
+            Assert.assertNotNull("Subscription group should be migrated", migratedConfig);
+            Assert.assertEquals("Group name should match", TEST_GROUP, migratedConfig.getGroupName());
+            Assert.assertEquals("Retry max times should match", 3, migratedConfig.getRetryMaxTimes());
+            Assert.assertTrue("Consume enable should match", migratedConfig.isConsumeEnable());
+            Assert.assertTrue("Consume from min enable should match", migratedConfig.isConsumeFromMinEnable());
+
+            groupConfig.setRetryMaxTimes(4);
+            unifiedManager.updateSubscriptionGroupConfig(groupConfig);
+            unifiedManager.persist();
+            unifiedManager.stop();
+        }
+
+        {
+            // Now create unified RocksDB manager which should migrate data
+            RocksDBSubscriptionGroupManager unifiedManager = new RocksDBSubscriptionGroupManager(brokerController, true);
+            boolean loaded = unifiedManager.load();
+            Assert.assertTrue("Unified manager should load successfully", loaded);
+
+            // Verify that data was migrated
+            SubscriptionGroupConfig migratedConfig = unifiedManager.findSubscriptionGroupConfig(TEST_GROUP);
+            Assert.assertNotNull("Subscription group should be migrated", migratedConfig);
+            Assert.assertEquals("Group name should match", TEST_GROUP, migratedConfig.getGroupName());
+            Assert.assertEquals("Retry max times should match", 4, migratedConfig.getRetryMaxTimes());
+            Assert.assertTrue("Consume enable should match", migratedConfig.isConsumeEnable());
+            Assert.assertTrue("Consume from min enable should match", migratedConfig.isConsumeFromMinEnable());
+
+            unifiedManager.stop();
+        }
+    }
+
+    @Test
+    public void testMigrationWithNoSeparateRocksDB() {
+        
+        // Ensure separate RocksDB doesn't exist
+        UtilAll.deleteFile(new File(separateRocksDBPath));
+        
+        // Create unified RocksDB manager - should not fail even without separate DB
+        RocksDBSubscriptionGroupManager unifiedManager = new RocksDBSubscriptionGroupManager(brokerController, true);
+        boolean loaded = unifiedManager.load();
+        Assert.assertTrue("Unified manager should load successfully even without separate DB", loaded);
+        
+        unifiedManager.stop();
+    }
+
+    @Test
+    public void testNoMigrationWhenDisabled() {
+        
+        // Create data in separate RocksDB mode
+        RocksDBSubscriptionGroupManager separateManager = new RocksDBSubscriptionGroupManager(brokerController, false);
+        separateManager.load();
+        
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(TEST_GROUP);
+        groupConfig.setConsumeEnable(true);
+        groupConfig.setConsumeFromMinEnable(true);
+        separateManager.putSubscriptionGroupConfig(groupConfig);
+        separateManager.persist();
+        separateManager.stop();
+        
+        // Create another separate manager - should not trigger migration
+        RocksDBSubscriptionGroupManager anotherSeparateManager = new RocksDBSubscriptionGroupManager(brokerController, false);
+        boolean loaded = anotherSeparateManager.load();
+        Assert.assertTrue("Separate manager should load successfully", loaded);
+        
+        anotherSeparateManager.stop();
+    }
+}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManagerMigrationTest.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManagerMigrationTest.java
new file mode 100644
index 0000000..e7097bf
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManagerMigrationTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v1;
+
+import java.io.File;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RocksDBTopicConfigManagerMigrationTest {
+
+    private static final String TEST_TOPIC = "TestTopic";
+    
+    private BrokerController brokerController;
+    private String storePath;
+    private String separateRocksDBPath;
+    private String unifiedRocksDBPath;
+
+    @Before
+    public void init() {
+        
+        brokerController = Mockito.mock(BrokerController.class);
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        storePath = System.getProperty("java.io.tmpdir") + File.separator + "rocketmq-test-" + System.currentTimeMillis();
+        messageStoreConfig.setStorePathRootDir(storePath);
+        Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        Mockito.when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
+        
+        separateRocksDBPath = storePath + File.separator + "config" + File.separator + "topics" + File.separator;
+        unifiedRocksDBPath = storePath + File.separator + "config" + File.separator + "metadata" + File.separator;
+        
+        // Create directories
+        UtilAll.ensureDirOK(separateRocksDBPath);
+        UtilAll.ensureDirOK(unifiedRocksDBPath);
+    }
+
+    @After
+    public void destroy() {
+        
+        // Clean up test directories
+        UtilAll.deleteFile(new File(storePath));
+    }
+
+    @Test
+    public void testMigrationFromSeparateToUnifiedRocksDB() {
+        
+        // First, create data in separate RocksDB mode
+        RocksDBTopicConfigManager separateManager = new RocksDBTopicConfigManager(brokerController, false);
+        separateManager.load();
+        
+        // Add some topic configs
+        TopicConfig topicConfig = new TopicConfig(TEST_TOPIC, 4, 4);
+        separateManager.updateTopicConfig(topicConfig);
+        separateManager.persist();
+        separateManager.stop();
+
+        {
+            // Now create unified RocksDB manager which should migrate data
+            RocksDBTopicConfigManager unifiedManager = new RocksDBTopicConfigManager(brokerController, true);
+            boolean loaded = unifiedManager.load();
+            Assert.assertTrue("Unified manager should load successfully", loaded);
+
+            // Verify that data was migrated
+            TopicConfig migratedConfig = unifiedManager.selectTopicConfig(TEST_TOPIC);
+            Assert.assertNotNull("Topic config should be migrated", migratedConfig);
+            Assert.assertEquals("Topic name should match", TEST_TOPIC, migratedConfig.getTopicName());
+            Assert.assertEquals("Read queue num should match", 4, migratedConfig.getReadQueueNums());
+            Assert.assertEquals("Write queue num should match", 4, migratedConfig.getWriteQueueNums());
+
+            topicConfig.setReadQueueNums(8);
+            topicConfig.setWriteQueueNums(8);
+            unifiedManager.updateTopicConfig(topicConfig);
+            unifiedManager.persist();
+            unifiedManager.stop();
+        }
+
+        {
+            // Now create unified RocksDB manager which should migrate data
+            RocksDBTopicConfigManager unifiedManager = new RocksDBTopicConfigManager(brokerController, true);
+            boolean loaded = unifiedManager.load();
+            Assert.assertTrue("Unified manager should load successfully", loaded);
+
+            // Verify that data was migrated
+            TopicConfig migratedConfig = unifiedManager.selectTopicConfig(TEST_TOPIC);
+            Assert.assertNotNull("Topic config should be migrated", migratedConfig);
+            Assert.assertEquals("Topic name should match", TEST_TOPIC, migratedConfig.getTopicName());
+            Assert.assertEquals("Read queue num should match", 8, migratedConfig.getReadQueueNums());
+            Assert.assertEquals("Write queue num should match", 8, migratedConfig.getWriteQueueNums());
+
+            unifiedManager.stop();
+        }
+
+    }
+
+    @Test
+    public void testMigrationWithNoSeparateRocksDB() {
+        
+        // Ensure separate RocksDB doesn't exist
+        UtilAll.deleteFile(new File(separateRocksDBPath));
+        
+        // Create unified RocksDB manager - should not fail even without separate DB
+        RocksDBTopicConfigManager unifiedManager = new RocksDBTopicConfigManager(brokerController, true);
+        boolean loaded = unifiedManager.load();
+        Assert.assertTrue("Unified manager should load successfully even without separate DB", loaded);
+        
+        unifiedManager.stop();
+    }
+
+    @Test
+    public void testNoMigrationWhenDisabled() {
+        // Create data in separate RocksDB mode
+        RocksDBTopicConfigManager separateManager = new RocksDBTopicConfigManager(brokerController, false);
+        separateManager.load();
+        
+        TopicConfig topicConfig = new TopicConfig(TEST_TOPIC, 4, 4);
+        separateManager.putTopicConfig(topicConfig);
+        separateManager.persist();
+        separateManager.stop();
+        
+        // Create another separate manager - should not trigger migration
+        RocksDBTopicConfigManager anotherSeparateManager = new RocksDBTopicConfigManager(brokerController, false);
+        boolean loaded = anotherSeparateManager.load();
+        Assert.assertTrue("Separate manager should load successfully", loaded);
+        
+        anotherSeparateManager.stop();
+    }
+}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
index 191850f..1227d33 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
@@ -22,7 +22,7 @@
 
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
-import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
 import org.junit.Assert;
@@ -49,6 +49,7 @@
         brokerController = Mockito.mock(BrokerController.class);
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        Mockito.when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
 
         consumerOffsetManager = new RocksDBConsumerOffsetManager(brokerController);
         consumerOffsetManager.load();
@@ -110,6 +111,6 @@
     }
 
     private boolean notToBeExecuted() {
-        return MixAll.isMac();
+        return false;
     }
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
index aa5003f..a2dbf60 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
@@ -50,7 +50,7 @@
     @Before
     public void setUp() {
         brokerController = Mockito.mock(BrokerController.class);
-        when(brokerController.getMessageStoreConfig()).thenReturn(Mockito.mock(MessageStoreConfig.class));
+        when(brokerController.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
         when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
         offsetManager = new RocksDBConsumerOffsetManager(brokerController);
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 04828da..a464355 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -471,6 +471,12 @@
      */
     private String configManagerVersion = ConfigManagerVersion.V1.getVersion();
 
+    /**
+     * Whether to use a single RocksDB instance with multiple column families for all configs
+     * instead of separate RocksDB instances for Topic, Group, and Offset configs
+     */
+    private boolean useSingleRocksDBForAllConfigs = false;
+
     private boolean allowRecallWhenBrokerNotWriteable = true;
 
     private boolean recallMessageEnable = false;
@@ -2116,6 +2122,14 @@
         this.configManagerVersion = configManagerVersion;
     }
 
+    public boolean isUseSingleRocksDBForAllConfigs() {
+        return useSingleRocksDBForAllConfigs;
+    }
+
+    public void setUseSingleRocksDBForAllConfigs(boolean useSingleRocksDBForAllConfigs) {
+        this.useSingleRocksDBForAllConfigs = useSingleRocksDBForAllConfigs;
+    }
+
     public boolean isAllowRecallWhenBrokerNotWriteable() {
         return allowRecallWhenBrokerNotWriteable;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index e087817..08a103b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -18,6 +18,7 @@
 
 import com.google.common.collect.Maps;
 import io.netty.buffer.PooledByteBufAllocator;
+import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -506,7 +507,9 @@
             //1. close column family handles
             preShutdown();
 
-            this.defaultCFHandle.close();
+            if (this.defaultCFHandle.isOwningHandle()) {
+                this.defaultCFHandle.close();
+            }
 
             //2. close column family options.
             for (final ColumnFamilyOptions opt : this.cfOptions) {
@@ -711,4 +714,22 @@
         } catch (Exception ignored) {
         }
     }
+
+    public void destroy() {
+        recursiveDelete(new File(dbPath));
+    }
+
+    void recursiveDelete(File file) {
+        if (file.isFile()) {
+            if (file.delete()) {
+                LOGGER.info("Delete rocksdb file={}", file.getAbsolutePath());
+            }
+        } else {
+            File[] files = file.listFiles();
+            for (File f : files) {
+                recursiveDelete(f);
+            }
+            file.delete();
+        }
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index 5fd9bab..e1802fd 100644
--- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -17,47 +17,50 @@
 package org.apache.rocketmq.common.config;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompressionType;
-import org.rocksdb.ReadOptions;
+import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteBatch;
 
 public class ConfigRocksDBStorage extends AbstractRocksDBStorage {
-    public static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME = "kvDataVersion".getBytes(StandardCharsets.UTF_8);
-    public static final byte[] FORBIDDEN_COLUMN_FAMILY_NAME = "forbidden".getBytes(StandardCharsets.UTF_8);
+    public static final Charset CHARSET = StandardCharsets.UTF_8;
+    public static final ConcurrentMap<String, ConfigRocksDBStorage> STORE_MAP = new ConcurrentHashMap<>();
 
-    protected ColumnFamilyHandle kvDataVersionFamilyHandle;
-    protected ColumnFamilyHandle forbiddenFamilyHandle;
-    public static final byte[] KV_DATA_VERSION_KEY = "kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
+    private final ConcurrentHashMap<String, ColumnFamilyHandle> columnFamilyNameHandleMap;
+    private ColumnFamilyOptions columnFamilyOptions;
 
-
-
-    public ConfigRocksDBStorage(final String dbPath) {
-        this(dbPath, false);
-    }
-
-    public ConfigRocksDBStorage(final String dbPath, CompressionType compressionType) {
-        this(dbPath, false);
-        this.compressionType = compressionType;
+    private ConfigRocksDBStorage(final String dbPath, boolean readOnly, CompressionType compressionType) {
+        super(dbPath);
+        this.readOnly = readOnly;
+        if (compressionType != null) {
+            this.compressionType = compressionType;
+        }
+        this.columnFamilyNameHandleMap = new ConcurrentHashMap<>();
     }
 
     public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
-        super(dbPath);
-        this.readOnly = readOnly;
+        this(dbPath, readOnly, null);
     }
 
     protected void initOptions() {
         this.options = ConfigHelper.createConfigDBOptions();
+        this.columnFamilyOptions = ConfigHelper.createConfigColumnFamilyOptions();
+        this.cfOptions.add(columnFamilyOptions);
         super.initOptions();
     }
 
@@ -68,19 +71,20 @@
 
             initOptions();
 
-            final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+            List<byte[]> columnFamilyNames = new ArrayList<>(RocksDB.listColumnFamilies(
+                new Options(options, columnFamilyOptions), dbPath));
+            addIfNotExists(columnFamilyNames, RocksDB.DEFAULT_COLUMN_FAMILY);
 
-            ColumnFamilyOptions defaultOptions = ConfigHelper.createConfigColumnFamilyOptions();
-            this.cfOptions.add(defaultOptions);
-            cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
-            cfDescriptors.add(new ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions));
-            cfDescriptors.add(new ColumnFamilyDescriptor(FORBIDDEN_COLUMN_FAMILY_NAME, defaultOptions));
-            open(cfDescriptors);
+            List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+            for (byte[] columnFamilyName : columnFamilyNames) {
+                cfDescriptors.add(new ColumnFamilyDescriptor(columnFamilyName, columnFamilyOptions));
+            }
 
-            this.defaultCFHandle = cfHandles.get(0);
-            this.kvDataVersionFamilyHandle = cfHandles.get(1);
-            this.forbiddenFamilyHandle = cfHandles.get(2);
-
+            this.open(cfDescriptors);
+            for (int i = 0; i < columnFamilyNames.size(); i++) {
+                columnFamilyNameHandleMap.put(new String(columnFamilyNames.get(i), CHARSET), cfHandles.get(i));
+            }
+            this.defaultCFHandle = columnFamilyNameHandleMap.get(new String(RocksDB.DEFAULT_COLUMN_FAMILY, CHARSET));
         } catch (final Exception e) {
             AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", this.dbPath, e);
             return false;
@@ -90,45 +94,16 @@
 
     @Override
     protected void preShutdown() {
-        this.kvDataVersionFamilyHandle.close();
-        this.forbiddenFamilyHandle.close();
+        for (final ColumnFamilyHandle columnFamilyHandle : this.columnFamilyNameHandleMap.values()) {
+            if (columnFamilyHandle.isOwningHandle()) {
+                columnFamilyHandle.close();
+            }
+        }
     }
 
-    public void put(final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception {
-        put(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes, keyLen, valueBytes, valueBytes.length);
-    }
-
-    public void put(final ByteBuffer keyBB, final ByteBuffer valueBB) throws Exception {
-        put(this.defaultCFHandle, this.ableWalWriteOptions, keyBB, valueBB);
-    }
-
-    public byte[] get(final byte[] keyBytes) throws Exception {
-        return get(this.defaultCFHandle, this.totalOrderReadOptions, keyBytes);
-    }
-
-    public void updateKvDataVersion(final byte[] valueBytes) throws Exception {
-        put(this.kvDataVersionFamilyHandle, this.ableWalWriteOptions, KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length, valueBytes, valueBytes.length);
-    }
-
-    public byte[] getKvDataVersion() throws Exception {
-        return get(this.kvDataVersionFamilyHandle, this.totalOrderReadOptions, KV_DATA_VERSION_KEY);
-    }
-
-    public void updateForbidden(final byte[] keyBytes, final byte[] valueBytes) throws Exception {
-        put(this.forbiddenFamilyHandle, this.ableWalWriteOptions, keyBytes, keyBytes.length, valueBytes, valueBytes.length);
-    }
-
-    public byte[] getForbidden(final byte[] keyBytes) throws Exception {
-        return get(this.forbiddenFamilyHandle, this.totalOrderReadOptions, keyBytes);
-    }
-
-    public void delete(final byte[] keyBytes) throws Exception {
-        delete(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes);
-    }
-
-    public List<byte[]> multiGet(final List<ColumnFamilyHandle> cfhList, final List<byte[]> keys) throws
-        RocksDBException {
-        return multiGet(this.totalOrderReadOptions, cfhList, keys);
+    // batch operations
+    public void writeBatchPutOperation(String cf, WriteBatch writeBatch, final byte[] key, final byte[] value) throws RocksDBException {
+        writeBatch.put(getOrCreateColumnFamily(cf), key, value);
     }
 
     public void batchPut(final WriteBatch batch) throws RocksDBException {
@@ -139,15 +114,100 @@
         batchPut(this.ableWalWriteOptions, batch);
     }
 
+
+    // operations with the specified cf
+    public void put(String cf, final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception {
+        put(getOrCreateColumnFamily(cf), this.ableWalWriteOptions, keyBytes, keyLen, valueBytes, valueBytes.length);
+    }
+
+    public void put(String cf, final ByteBuffer keyBB, final ByteBuffer valueBB) throws Exception {
+        put(getOrCreateColumnFamily(cf), this.ableWalWriteOptions, keyBB, valueBB);
+    }
+
+    public byte[] get(String cf, final byte[] keyBytes) throws Exception {
+        ColumnFamilyHandle columnFamilyHandle = columnFamilyNameHandleMap.get(cf);
+        if (columnFamilyHandle == null) {
+            return null;
+        }
+        return get(columnFamilyHandle, this.totalOrderReadOptions, keyBytes);
+    }
+
+    public void delete(String cf, final byte[] keyBytes) throws Exception {
+        ColumnFamilyHandle columnFamilyHandle = columnFamilyNameHandleMap.get(cf);
+        if (columnFamilyHandle == null) {
+            return;
+        }
+        delete(columnFamilyHandle, this.ableWalWriteOptions, keyBytes);
+    }
+
+    public void iterate(final String cf, BiConsumer<byte[], byte[]> biConsumer) throws RocksDBException {
+        if (!hold()) {
+            LOGGER.warn("RocksDBKvStore[path={}] has been shut down", dbPath);
+            return;
+        }
+        ColumnFamilyHandle columnFamilyHandle = columnFamilyNameHandleMap.get(cf);
+        if (columnFamilyHandle == null) {
+            return;
+        }
+        try (RocksIterator iterator = this.db.newIterator(columnFamilyHandle)) {
+            for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
+                biConsumer.accept(iterator.key(), iterator.value());
+            }
+            iterator.status();
+        }
+    }
+
     public RocksIterator iterator() {
         return this.db.newIterator(this.defaultCFHandle, this.totalOrderReadOptions);
     }
 
-    public RocksIterator forbiddenIterator() {
-        return this.db.newIterator(this.forbiddenFamilyHandle, this.totalOrderReadOptions);
+    public ColumnFamilyHandle getOrCreateColumnFamily(String cf) throws RocksDBException {
+        if (!columnFamilyNameHandleMap.containsKey(cf)) {
+            if (readOnly) {
+                String errInfo = String.format("RocksDBKvStore[path=%s] is open as read-only", dbPath);
+                LOGGER.warn(errInfo);
+                throw new RocksDBException(errInfo);
+            }
+            synchronized (this) {
+                if (!columnFamilyNameHandleMap.containsKey(cf)) {
+                    ColumnFamilyDescriptor columnFamilyDescriptor =
+                        new ColumnFamilyDescriptor(cf.getBytes(CHARSET), columnFamilyOptions);
+                    ColumnFamilyHandle columnFamilyHandle = db.createColumnFamily(columnFamilyDescriptor);
+                    columnFamilyNameHandleMap.putIfAbsent(cf, columnFamilyHandle);
+                    cfHandles.add(columnFamilyHandle);
+                }
+            }
+        }
+        return columnFamilyNameHandleMap.get(cf);
     }
 
-    public RocksIterator iterator(ReadOptions readOptions) {
-        return this.db.newIterator(this.defaultCFHandle, readOptions);
+    public void addIfNotExists(List<byte[]> columnFamilyNames, byte[] byteArray) {
+        if (columnFamilyNames.stream().noneMatch(array -> Arrays.equals(array, byteArray))) {
+            columnFamilyNames.add(byteArray);
+        }
+    }
+
+    public static ConfigRocksDBStorage getStore(String path, boolean readOnly, CompressionType compressionType) {
+        return ConcurrentHashMapUtils.computeIfAbsent(STORE_MAP, path,
+            k -> new ConfigRocksDBStorage(path, readOnly, compressionType));
+    }
+
+    public static ConfigRocksDBStorage getStore(String path, boolean readOnly) {
+        return getStore(path, readOnly, null);
+    }
+
+    public static void shutdown(String path) {
+        ConfigRocksDBStorage kvStore = STORE_MAP.remove(path);
+        if (kvStore != null) {
+            kvStore.shutdown();
+        }
+    }
+
+    public static void destroy(String path) {
+        ConfigRocksDBStorage kvStore = STORE_MAP.remove(path);
+        if (kvStore != null) {
+            kvStore.shutdown();
+            kvStore.destroy();
+        }
     }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
index 48bc163..94899fc 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -192,8 +192,12 @@
             return loadConsumerOffsets(path);
         }
 
-        ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true);
-        configRocksDBStorage.start();
+        ConfigRocksDBStorage configRocksDBStorage = ConfigRocksDBStorage.getStore(path, true);
+        if (!configRocksDBStorage.start()) {
+            System.out.print("Failed to initialize ConfigRocksDBStorage.\n");
+            return null;
+        }
+
         RocksIterator iterator = configRocksDBStorage.iterator();
         try {
             final Map<String, JSONObject> configMap = new HashMap<>();
@@ -208,10 +212,17 @@
                 configTable.put(name, jsonObject);
                 iterator.next();
             }
-            byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion();
-            if (kvDataVersion != null) {
-                configMap.put("dataVersion",
-                    JSONObject.parseObject(new String(kvDataVersion, DataConverter.CHARSET_UTF8)));
+
+            // Try to get data version
+            try {
+                byte[] kvDataVersion = configRocksDBStorage.get("kvDataVersion",
+                    "kvDataVersionKey".getBytes(DataConverter.CHARSET_UTF8));
+                if (kvDataVersion != null) {
+                    configMap.put("dataVersion",
+                        JSONObject.parseObject(new String(kvDataVersion, DataConverter.CHARSET_UTF8)));
+                }
+            } catch (Exception e) {
+                // Ignore if data version is not available
             }
 
             if (ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS.equals(configType)) {
@@ -224,7 +235,7 @@
         } catch (Exception e) {
             System.out.print("Error occurred while converting RocksDB kv config to json, " + "configType=" + configType + ", " + e.getMessage() + "\n");
         } finally {
-            configRocksDBStorage.shutdown();
+            ConfigRocksDBStorage.shutdown(path);
         }
         return null;
     }
@@ -284,8 +295,12 @@
     }
 
     private static Map<String, JSONObject> loadConsumerOffsets(String path) {
-        ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true);
-        configRocksDBStorage.start();
+        ConfigRocksDBStorage configRocksDBStorage = ConfigRocksDBStorage.getStore(path, true);
+        if (!configRocksDBStorage.start()) {
+            System.out.print("Failed to initialize ConfigRocksDBStorage for consumer offsets.\n");
+            return null;
+        }
+
         RocksIterator iterator = configRocksDBStorage.iterator();
         try {
             final Map<String, JSONObject> configMap = new HashMap<>();
@@ -305,7 +320,7 @@
         } catch (Exception e) {
             System.out.print("Error occurred while converting RocksDB kv config to json, " + "configType=consumerOffsets, " + e.getMessage() + "\n");
         } finally {
-            configRocksDBStorage.shutdown();
+            ConfigRocksDBStorage.shutdown(path);
         }
         return null;
     }