add LMQ prefix
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
index fd8d718..e9f87e6 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
@@ -80,7 +80,7 @@
String brokerAddress = tmpBrokerAddressMap.get(queue.getBrokerName());
QueueOffset queueOffset = each.getValue();
UpdateConsumerOffsetRequestHeader updateHeader = new UpdateConsumerOffsetRequestHeader();
- updateHeader.setTopic(StringUtils.replace(queue.getQueueName(), "/","%"));
+ updateHeader.setTopic(MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%"));
updateHeader.setConsumerGroup(MixAll.LMQ_PREFIX + clientId);
updateHeader.setQueueId((int) queue.getQueueId());
updateHeader.setCommitOffset(queueOffset.getOffset());
@@ -111,7 +111,7 @@
map.put(queue, queueOffset);
try {
QueryConsumerOffsetRequestHeader queryHeader = new QueryConsumerOffsetRequestHeader();
- queryHeader.setTopic(StringUtils.replace(queue.getQueueName(), "/","%"));
+ queryHeader.setTopic(MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%"));
queryHeader.setConsumerGroup(MixAll.LMQ_PREFIX + clientId);
queryHeader.setQueueId((int) queue.getQueueId());
long offset = defaultMQPullConsumer