[ISSUE #8698] Remove batch write in kv cq store and update rocksdb cq check tool (#8739)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 863f16e..80f3f44 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -215,6 +215,7 @@
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
@@ -470,16 +471,21 @@
String requestTopic = requestHeader.getTopic();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
-
- DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore();
- RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore();
- if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
+ MessageStore messageStore = brokerController.getMessageStore();
+ DefaultMessageStore defaultMessageStore;
+ if (messageStore instanceof AbstractPluginMessageStore) {
+ defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext();
+ } else {
+ defaultMessageStore = (DefaultMessageStore) messageStore;
+ }
+ RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore();
+ if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid")));
return response;
}
- ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = messageStore.getConsumeQueueTable();
- StringBuilder diffResult = new StringBuilder("check success, all is ok!\n");
+ ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable();
+ StringBuilder diffResult = new StringBuilder();
try {
if (StringUtils.isNotBlank(requestTopic)) {
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false);
@@ -516,15 +522,15 @@
Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
if (fileCqUnit == null || kvCqUnit == null) {
- diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file: %s \n",
+ diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file : %s \n",
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
return;
}
if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
- String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file: %s \n kv: %s",
+ String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file : %s \n kv : %s \n",
topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1());
LOGGER.error(diffInfo);
- diffResult.append(diffInfo).append("\n");
+ diffResult.append(diffInfo).append(System.lineSeparator());
return;
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index c077831..6853128 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -428,15 +428,15 @@
private boolean rocksdbCQDoubleWriteEnable = false;
- private boolean enableBatchWriteKvCq = true;
+ private int batchWriteKvCqSize = 16;
- public boolean isEnableBatchWriteKvCq() {
- return enableBatchWriteKvCq;
+ public int getBatchWriteKvCqSize() {
+ return batchWriteKvCqSize;
}
- public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) {
- this.enableBatchWriteKvCq = enableBatchWriteKvCq;
+ public void setBatchWriteKvCqSize(int batchWriteKvCqSize) {
+ this.batchWriteKvCqSize = batchWriteKvCqSize;
}
public boolean isRocksdbCQDoubleWriteEnable() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 2f2ce98..2401257 100644
--- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -661,4 +661,8 @@
public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
next.notifyMessageArriveIfNecessary(dispatchRequest);
}
+
+ public MessageStore getNext() {
+ return next;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 34c6d2f..c889ae7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -55,7 +55,7 @@
public static final byte CTRL_1 = '\u0001';
public static final byte CTRL_2 = '\u0002';
- private static final int BATCH_SIZE = 16;
+ private final int batchSize;
public static final int MAX_KEY_LEN = 300;
private final ScheduledExecutorService scheduledExecutorService;
@@ -77,8 +77,6 @@
private final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> tempTopicQueueMaxOffsetMap;
private volatile boolean isCQError = false;
- private boolean enableBatchWriteKvCq;
-
public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
super(messageStore);
@@ -88,11 +86,11 @@
this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore);
this.writeBatch = new WriteBatch();
- this.enableBatchWriteKvCq = messageStoreConfig.isEnableBatchWriteKvCq();
- this.bufferDRList = new ArrayList(BATCH_SIZE);
- this.cqBBPairList = new ArrayList(BATCH_SIZE);
- this.offsetBBPairList = new ArrayList(BATCH_SIZE);
- for (int i = 0; i < BATCH_SIZE; i++) {
+ this.batchSize = messageStoreConfig.getBatchWriteKvCqSize();
+ this.bufferDRList = new ArrayList(batchSize);
+ this.cqBBPairList = new ArrayList(batchSize);
+ this.offsetBBPairList = new ArrayList(batchSize);
+ for (int i = 0; i < batchSize; i++) {
this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair());
this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair());
}
@@ -166,12 +164,13 @@
@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException {
+ if (request == null || this.bufferDRList.size() >= batchSize) {
+ putMessagePosition();
+ }
+
if (request != null) {
this.bufferDRList.add(request);
}
- if (request == null || !enableBatchWriteKvCq || this.bufferDRList.size() >= BATCH_SIZE) {
- putMessagePosition();
- }
}
public void putMessagePosition() throws RocksDBException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 43e4259..313a777 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -104,6 +104,7 @@
import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
+import org.apache.rocketmq.tools.command.queue.CheckRocksdbCqWriteProgressCommand;
import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
@@ -304,6 +305,7 @@
initCommand(new ListAclSubCommand());
initCommand(new CopyAclsSubCommand());
initCommand(new RocksDBConfigToJsonCommand());
+ initCommand(new CheckRocksdbCqWriteProgressCommand());
}
private static void printHelp() {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
index 82dcb74..d18a24e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java
@@ -34,7 +34,7 @@
@Override
public String commandName() {
- return "checkRocksdbCqWriteProgressCommandCommand";
+ return "checkRocksdbCqWriteProgress";
}
@Override
@@ -82,9 +82,9 @@
String brokerAddr = brokerData.getBrokerAddrs().get(0L);
CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
if (StringUtils.isNotBlank(topic)) {
- System.out.printf(body.getDiffResult());
+ System.out.print(body.getDiffResult());
} else {
- System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult());
+ System.out.print(brokerName + " | " + brokerAddr + " | \n" + body.getDiffResult());
}
}