[ISSUE #8698] Remove batch write in kv cq store and update rocksdb cq check tool (#8739)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 863f16e..80f3f44 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -215,6 +215,7 @@
 import org.apache.rocketmq.store.RocksDBMessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.ReferredIterator;
@@ -470,16 +471,21 @@
         String requestTopic = requestHeader.getTopic();
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         response.setCode(ResponseCode.SUCCESS);
-
-        DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore();
-        RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore();
-        if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+        MessageStore messageStore = brokerController.getMessageStore();
+        DefaultMessageStore defaultMessageStore;
+        if (messageStore instanceof AbstractPluginMessageStore) {
+            defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext();
+        } else {
+            defaultMessageStore = (DefaultMessageStore) messageStore;
+        }
+        RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore();
+        if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
             response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid")));
             return response;
         }
 
-        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = messageStore.getConsumeQueueTable();
-        StringBuilder diffResult = new StringBuilder("check success, all is ok!\n");
+        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable();
+        StringBuilder diffResult = new StringBuilder();
         try {
             if (StringUtils.isNotBlank(requestTopic)) {
                 processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false);
@@ -516,15 +522,15 @@
                 Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
                 Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
                 if (fileCqUnit == null || kvCqUnit == null) {
-                    diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv  : %s  \n file: %s  \n",
+                    diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv   : %s  \n file : %s  \n",
                         topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
                     return;
                 }
                 if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
-                    String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file: %s  \n  kv: %s",
+                    String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file : %s  \n  kv : %s \n",
                         topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1());
                     LOGGER.error(diffInfo);
-                    diffResult.append(diffInfo).append("\n");
+                    diffResult.append(diffInfo).append(System.lineSeparator());
                     return;
                 }
             }
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index c077831..6853128 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -428,15 +428,15 @@
 
     private boolean rocksdbCQDoubleWriteEnable = false;
 
-    private boolean enableBatchWriteKvCq = true;
+    private int batchWriteKvCqSize = 16;
 
 
-    public boolean isEnableBatchWriteKvCq() {
-        return enableBatchWriteKvCq;
+    public int getBatchWriteKvCqSize() {
+        return batchWriteKvCqSize;
     }
 
-    public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) {
-        this.enableBatchWriteKvCq = enableBatchWriteKvCq;
+    public void setBatchWriteKvCqSize(int batchWriteKvCqSize) {
+        this.batchWriteKvCqSize = batchWriteKvCqSize;
     }
 
     public boolean isRocksdbCQDoubleWriteEnable() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 2f2ce98..2401257 100644
--- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -661,4 +661,8 @@
     public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
         next.notifyMessageArriveIfNecessary(dispatchRequest);
     }
+
+    public MessageStore getNext() {
+        return next;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 34c6d2f..c889ae7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -55,7 +55,7 @@
     public static final byte CTRL_1 = '\u0001';
     public static final byte CTRL_2 = '\u0002';
 
-    private static final int BATCH_SIZE = 16;
+    private final int batchSize;
     public static final int MAX_KEY_LEN = 300;
 
     private final ScheduledExecutorService scheduledExecutorService;
@@ -77,8 +77,6 @@
     private final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> tempTopicQueueMaxOffsetMap;
     private volatile boolean isCQError = false;
 
-    private boolean enableBatchWriteKvCq;
-
     public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
         super(messageStore);
 
@@ -88,11 +86,11 @@
         this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore);
 
         this.writeBatch = new WriteBatch();
-        this.enableBatchWriteKvCq = messageStoreConfig.isEnableBatchWriteKvCq();
-        this.bufferDRList = new ArrayList(BATCH_SIZE);
-        this.cqBBPairList = new ArrayList(BATCH_SIZE);
-        this.offsetBBPairList = new ArrayList(BATCH_SIZE);
-        for (int i = 0; i < BATCH_SIZE; i++) {
+        this.batchSize = messageStoreConfig.getBatchWriteKvCqSize();
+        this.bufferDRList = new ArrayList(batchSize);
+        this.cqBBPairList = new ArrayList(batchSize);
+        this.offsetBBPairList = new ArrayList(batchSize);
+        for (int i = 0; i < batchSize; i++) {
             this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair());
             this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair());
         }
@@ -166,12 +164,13 @@
 
     @Override
     public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException {
+        if (request == null || this.bufferDRList.size() >= batchSize) {
+            putMessagePosition();
+        }
+
         if (request != null) {
             this.bufferDRList.add(request);
         }
-        if (request == null || !enableBatchWriteKvCq || this.bufferDRList.size() >= BATCH_SIZE) {
-            putMessagePosition();
-        }
     }
 
     public void putMessagePosition() throws RocksDBException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 43e4259..313a777 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -104,6 +104,7 @@
 import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
 import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
 import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
+import org.apache.rocketmq.tools.command.queue.CheckRocksdbCqWriteProgressCommand;
 import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
 import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
 import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
@@ -304,6 +305,7 @@
         initCommand(new ListAclSubCommand());
         initCommand(new CopyAclsSubCommand());
         initCommand(new RocksDBConfigToJsonCommand());
+        initCommand(new CheckRocksdbCqWriteProgressCommand());
     }
 
     private static void printHelp() {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
index 82dcb74..d18a24e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
@@ -34,7 +34,7 @@
 
     @Override
     public String commandName() {
-        return "checkRocksdbCqWriteProgressCommandCommand";
+        return "checkRocksdbCqWriteProgress";
     }
 
     @Override
@@ -82,9 +82,9 @@
                 String brokerAddr = brokerData.getBrokerAddrs().get(0L);
                 CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
                 if (StringUtils.isNotBlank(topic)) {
-                    System.out.printf(body.getDiffResult());
+                    System.out.print(body.getDiffResult());
                 } else {
-                    System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult());
+                    System.out.print(brokerName + " | " + brokerAddr + " | \n" + body.getDiffResult());
                 }
             }