Merge pull request #218 from apache/develop

Develop
diff --git a/README.md b/README.md
index 61009c4..a44b294 100644
--- a/README.md
+++ b/README.md
@@ -43,13 +43,15 @@
 ```
 Some important configuration items in the **service.conf** configuration file 
 
-| **Config Key**        | **Instruction**          |
-|-----------------------|--------------------------|
-| username              | used for auth            |
-| secretKey             | used for auth            |
-| NAMESRV_ADDR          | specify namesrv address  |
-| eventNotifyRetryTopic | notify event retry topic |
-| clientRetryTopic      | client retry topic       |
+| **Config Key**        | **Instruction**                                               |
+|-----------------------|---------------------------------------------------------------|
+| username              | used for auth                                                 |
+| secretKey             | used for auth                                                 |
+| NAMESRV_ADDR          | specify namesrv address                                       |
+| eventNotifyRetryTopic | notify event retry topic                                      |
+| clientRetryTopic      | client retry topic                                            |
+| metaAddr              | meta all nodes ip:port. Same as membersAddress in meta.config |
+
 
 And some configuration items in the **meta.conf** configuration file
 
diff --git a/distribution/bin/meta.sh b/distribution/bin/meta.sh
index 091fb56..6ba17f1 100644
--- a/distribution/bin/meta.sh
+++ b/distribution/bin/meta.sh
@@ -49,7 +49,7 @@
 BASEDIR=$HOME
 mkdir -p $BASEDIR/logs
 
-mainClass="org.apache.rocketmq.mqtt.meta.starter.Startup"
+mainClass="org.apache.rocketmq.mqtt.meta.starter.MetaStartup"
 
 
 function startup() {
diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml
index 13bd032..1edfa50 100644
--- a/mqtt-common/pom.xml
+++ b/mqtt-common/pom.xml
@@ -81,6 +81,10 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
         </dependency>
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
index b393abe..656aee4 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
@@ -49,6 +49,19 @@
     CompletableFuture<PullResult> pullMessage(String firstTopic, Queue queue, QueueOffset queueOffset, long count);
 
     /**
+     * pop messages
+     *
+     * @param consumerGroup
+     * @param firstTopic
+     * @param queue
+     * @param count
+     * @return
+     */
+    CompletableFuture<PullResult> popMessage(String consumerGroup, String firstTopic, Queue queue, long count);
+
+    void popAck(String lmqTopic, String consumerGroup, Message message);
+
+    /**
      * pull last messages
      *
      * @param firstTopic
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java
index 20777d0..bacfe29 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java
@@ -18,7 +18,6 @@
 package org.apache.rocketmq.mqtt.common.facade;
 
 import org.apache.rocketmq.mqtt.common.model.Message;
-import org.apache.rocketmq.mqtt.common.model.Subscription;
 
 import java.util.ArrayList;
 import java.util.concurrent.CompletableFuture;
@@ -30,5 +29,5 @@
 
     CompletableFuture<Message> getRetainedMessage(String preciseTopic);
 
-    CompletableFuture<ArrayList<Message>> getMsgsFromTrie(Subscription topicFilter);
+    CompletableFuture<ArrayList<Message>> getMsgsFromTrie(String firstTopic,String originTopicFilter);
 }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgPersistManager.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgPersistManager.java
index d1d9e06..49d306c 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgPersistManager.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgPersistManager.java
@@ -23,11 +23,12 @@
 public interface WillMsgPersistManager {
 
     CompletableFuture<Boolean> put(final String key, final String value);
+
     CompletableFuture<Boolean> delete(final String key);
 
     CompletableFuture<byte[]> get(final String key);
 
     CompletableFuture<Boolean> compareAndPut(final String key, final String expectValue, final String updateValue);
 
-    CompletableFuture<Map<String, String>> scan(final String startKey, final String endKey);
+    CompletableFuture<Map<String, String>> scan(final String startKey, final String endKey, int scanNum);
 }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgSender.java
similarity index 70%
copy from mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java
copy to mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgSender.java
index e9635bf..8a3f5a0 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgSender.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.mqtt.common.meta;
+package org.apache.rocketmq.mqtt.common.facade;
 
-public class Constants {
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import org.apache.rocketmq.mqtt.common.model.StoreResult;
 
-    public static final String CATEGORY_RETAINED_MSG = "retainedMsg";
-    public static final String CATEGORY_WILL_MSG = "willMsg";
+import java.util.concurrent.CompletableFuture;
 
-    public static final String NOT_FOUND = "NOT_FOUND";
-
-    public static final String READ_INDEX_TYPE = "readIndexType";
+public interface WillMsgSender {
+    CompletableFuture<StoreResult> sendWillMsg(String clientId, MqttPublishMessage message);
 }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/MetaConstants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/MetaConstants.java
new file mode 100644
index 0000000..aad01f6
--- /dev/null
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/MetaConstants.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.common.meta;
+
+public class MetaConstants {
+
+    public static final String CATEGORY_RETAINED_MSG = "retainedMsg";
+    public static final String CATEGORY_WILL_MSG = "willMsg";
+    public static final String CATEGORY_HASH_KV = "hashKv";
+
+    public static final String NOT_FOUND = "NOT_FOUND";
+
+    public static final String READ_INDEX_TYPE = "readIndexType";
+    public static final String ANY_READ_TYPE = "anyRead";
+
+    public static final String OP_KV_GET = "get";
+    public static final String OP_KV_GET_HASH = "getHash";
+    public static final String OP_HASH_KV_FIELD = "field";
+    public static final String OP_KV_PUT = "put";
+    public static final String OP_KV_PUT_HASH = "putHash";
+    public static final String OP_KV_DEL = "del";
+    public static final String OP_KV_DEL_HASH = "delHash";
+
+    public static final String RETAIN_REQ_READ_PARAM_TOPIC = "topic";
+    public static final String RETAIN_REQ_READ_PARAM_FIRST_TOPIC = "firstTopic";
+    public static final String RETAIN_REQ_READ_PARAM_OPERATION_TRIE = "trie";
+    public static final String RETAIN_REQ_READ_PARAM_OPERATION_TOPIC = "topic";
+
+    public static final String RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC = "firstTopic";
+    public static final String RETAIN_REQ_WRITE_PARAM_TOPIC = "topic";
+    public static final String RETAIN_REQ_WRITE_PARAM_IS_EMPTY = "isEmpty";
+    public static final String RETAIN_REQ_WRITE_PARAM_EXPIRE = "expire";
+
+    public static final String WILL_REQ_READ_GET = "get";
+    public static final String WILL_REQ_READ_SCAN = "scan";
+    public static final String WILL_REQ_READ_SCAN_NUM = "scanNum";
+    public static final String WILL_REQ_READ_START_KEY = "startKey";
+    public static final String WILL_REQ_READ_END_KEY = "endKey";
+
+    public static final String WILL_REQ_WRITE_PUT = "put";
+    public static final String WILL_REQ_WRITE_DELETE = "delete";
+
+    public static final String WILL_REQ_WRITE_COMPARE_AND_PUT = "compareAndPut";
+    public static final String WILL_REQ_WRITE_EXPECT_VALUE = "expectValue";
+
+
+}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/RaftUtil.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/RaftUtil.java
index d730684..9c0f603 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/RaftUtil.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/RaftUtil.java
@@ -28,6 +28,7 @@
 
     public static final int RETAIN_RAFT_GROUP_INDEX = 0;
     public static final int WILL_RAFT_GROUP_INDEX = 1;
+    public static final int HASH_KV_RAFT_GROUP_INDEX = 2;
 
     static {
         RAFT_GROUPS = new String[RAFT_GROUP_NUM];
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
index f6e404c..cd764d7 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
@@ -23,6 +23,7 @@
 
     public static final String PLUS_SIGN = "+";
     public static final String NUMBER_SIGN = "#";
+    public static final String DOLLAR_SIGN = "$";
     public static final String COLON = ":";
 
     public static final String P2P = "/p2p/";
@@ -55,5 +56,7 @@
     public static final byte CTRL_2 = '\u0002';
 
     public static final String NOT_FOUND = "NOT_FOUND";
+    public static final String SHARED_PREFIX = DOLLAR_SIGN + "share";
+    public static final String EMPTY_SHARE_NAME = "";
 
 }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
index e26f780..60d4ba8 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
@@ -24,6 +24,7 @@
 public class PullResult {
     public static final int PULL_SUCCESS = 301;
     public static final int PULL_OFFSET_MOVED = 302;
+    public static final int NO_NEW_MSG = 303;
     private int code = PULL_SUCCESS;
     private String remark;
     private List<Message> messageList;
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
index c10921f..47b7e69 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
@@ -51,6 +51,10 @@
         return TopicUtils.isP2pTopic(queueName);
     }
 
+    public boolean isShare() {
+        return TopicUtils.isSharedSubscription(queueName);
+    }
+
     public long getQueueId() {
         return queueId;
     }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
index cd2d380..8aa62fb 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
@@ -85,6 +85,17 @@
         return topicFilter != null ? topicFilter.hashCode() : 0;
     }
 
+    public boolean isShare() {
+        return TopicUtils.isSharedSubscription(topicFilter);
+    }
+
+    public String getSharedName() {
+        if (!isShare()) {
+            return null;
+        }
+        return TopicUtils.getSharedName(topicFilter);
+    }
+
     public String getTopicFilter() {
         return topicFilter;
     }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
index 166475f..6f5b40b 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
@@ -116,6 +116,9 @@
         if (topics.startsWith(Constants.MQTT_TOPIC_DELIMITER)) {
             topics = topics.substring(1);
         }
+        if (topics.startsWith(Constants.SHARED_PREFIX)) {
+            topics = TopicUtils.getSharedTopicFilter(topics);
+        }
         String topic;
         String secondTopic = null;
         int index = topics.indexOf(Constants.MQTT_TOPIC_DELIMITER, 1);
@@ -185,4 +188,24 @@
     public static String wrapP2pLmq(String clientId) {
         return normalizeTopic(Constants.P2P + clientId);
     }
+
+    // shared subscription topic filter format: $share/{ShareName}/{filter}
+    public static boolean isSharedSubscription(String topicFilter) {
+        if (StringUtils.isEmpty(topicFilter)) {
+            return false;
+        }
+        if (!topicFilter.startsWith(Constants.SHARED_PREFIX)) {
+            return false;
+        }
+        String[] arr = topicFilter.split(Constants.MQTT_TOPIC_DELIMITER);
+        return arr.length > 2;
+    }
+
+    public static String getSharedName(String topicFilter) {
+        return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER)[1];
+    }
+
+    public static String getSharedTopicFilter(String topicFilter) {
+        return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER, 3)[2];
+    }
 }
diff --git a/mqtt-cs/pom.xml b/mqtt-cs/pom.xml
index 015f117..e3924a1 100644
--- a/mqtt-cs/pom.xml
+++ b/mqtt-cs/pom.xml
@@ -35,6 +35,10 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-core</artifactId>
         </dependency>
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
index 1b164ad..5299467 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
@@ -17,28 +17,16 @@
 
 package org.apache.rocketmq.mqtt.cs.channel;
 
-import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import org.apache.commons.lang3.StringUtils;
-
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
-import org.apache.rocketmq.mqtt.common.hook.HookResult;
-import org.apache.rocketmq.mqtt.common.meta.IpUtil;
-import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
-import org.apache.rocketmq.mqtt.common.model.Constants;
-import org.apache.rocketmq.mqtt.common.model.WillMessage;
-import org.apache.rocketmq.mqtt.common.util.HostInfo;
-import org.apache.rocketmq.mqtt.common.util.MessageUtil;
 import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 import org.apache.rocketmq.mqtt.cs.session.Session;
 import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
 import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
 import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
-import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor;
+import org.apache.rocketmq.mqtt.cs.session.loop.WillLoop;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -46,11 +34,8 @@
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 @Component
@@ -74,13 +59,7 @@
     private MqttMsgId mqttMsgId;
 
     @Resource
-    private WillMsgPersistManager willMsgPersistManager;
-
-    @Resource
-    private PublishProcessor publishProcessor;
-
-    private ThreadPoolExecutor executor;
-
+    private WillLoop willLoop;
 
     @PostConstruct
     public void init() {
@@ -92,14 +71,6 @@
                 closeConnect(channel, ChannelCloseFrom.SERVER, "ServerShutdown");
             }
         }));
-
-        executor = new ThreadPoolExecutor(
-                1,
-                1,
-                1,
-                TimeUnit.MINUTES,
-                new LinkedBlockingQueue<>(5000),
-                new ThreadFactoryImpl("DispatchWillToMQ_ "));
     }
 
     @Override
@@ -147,50 +118,7 @@
     public void closeConnect(Channel channel, ChannelCloseFrom from, String reason) {
         String clientId = ChannelInfo.getClientId(channel);
         String channelId = ChannelInfo.getId(channel);
-        String ip = IpUtil.getLocalAddressCompatible();
-
-        String willKey = ip + Constants.CTRL_1 + clientId;
-        CompletableFuture<byte[]> willMessageFuture = willMsgPersistManager.get(willKey);
-        willMessageFuture.whenComplete((willMessageByte, throwable) -> {
-            String content = new String(willMessageByte);
-            if (Constants.NOT_FOUND.equals(content)) {
-                return;
-            }
-
-            if (!"disconnect".equals(reason)) {
-                WillMessage willMessage = JSON.parseObject(content, WillMessage.class);
-
-                int mqttId = mqttMsgId.nextId(clientId);
-                MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
-                        willMessage.getQos(), mqttId, willMessage.isRetain());
-
-                Runnable runnable = new Runnable() {
-                    @Override
-                    public void run() {
-                        CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(buildMqttMessageUpContext(channel), mqttMessage);
-                        upstreamHookResult.whenComplete((hookResult, tb) -> {
-                            try {
-                                if (!hookResult.isSuccess()) {
-                                    executor.submit(this);
-                                } else {
-                                    willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
-                                        if (!resultDel || tbDel != null) {
-                                            logger.error("fail to delete will message key:{}", willKey);
-                                            return;
-                                        }
-                                        logger.info("connection close and send will, delete will message key {} successfully", willKey);
-                                    });
-                                }
-                            } catch (Throwable t) {
-                                logger.error("", t);
-                            }
-                        });
-                    }
-                };
-                executor.submit(runnable);
-            }
-        });
-
+        willLoop.closeConnect(channel, clientId, reason);
         if (clientId == null) {
             channelMap.remove(channelId);
             sessionLoop.unloadSession(clientId, channelId);
@@ -208,15 +136,6 @@
         logger.info("Close Connect of channel {} from {} by reason of {}", channel, from, reason);
     }
 
-    private MqttMessageUpContext buildMqttMessageUpContext(Channel channel) {
-        MqttMessageUpContext context = new MqttMessageUpContext();
-        context.setClientId(ChannelInfo.getClientId(channel));
-        context.setChannelId(ChannelInfo.getId(channel));
-        context.setNode(HostInfo.getInstall().getAddress());
-        context.setNamespace(ChannelInfo.getNamespace(channel));
-        return context;
-    }
-
     @Override
     public void closeConnect(String channelId, String reason) {
         Channel channel = channelMap.get(channelId);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/WillLoopConf.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/WillLoopConf.java
new file mode 100644
index 0000000..c05292e
--- /dev/null
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/WillLoopConf.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.config;
+
+public class WillLoopConf {
+    private int maxScanNodeNum = 100;
+    private int maxScanClientNum = 100000;
+    private int scanNumOnce = 100;
+    private boolean enableWill = true;
+
+    public int getMaxScanNodeNum() {
+        return maxScanNodeNum;
+    }
+
+    public void setMaxScanNodeNum(int maxScanNodeNum) {
+        this.maxScanNodeNum = maxScanNodeNum;
+    }
+
+    public int getMaxScanClientNum() {
+        return maxScanClientNum;
+    }
+
+    public void setMaxScanClientNum(int maxScanClientNum) {
+        this.maxScanClientNum = maxScanClientNum;
+    }
+
+    public int getScanNumOnce() {
+        return scanNumOnce;
+    }
+
+    public void setScanNumOnce(int scanNumOnce) {
+        this.scanNumOnce = scanNumOnce;
+    }
+
+    public boolean isEnableWill() {
+        return enableWill;
+    }
+
+    public void setEnableWill(boolean enableWill) {
+        this.enableWill = enableWill;
+    }
+}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
index 0fa60e1..247ecf9 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
@@ -34,6 +34,7 @@
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.facotry.MqttMessageFactory;
 import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.apache.rocketmq.mqtt.cs.session.loop.WillLoop;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -54,6 +55,9 @@
     @Resource
     private SessionLoop sessionLoop;
 
+    @Resource
+    private WillLoop willLoop;
+
     private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("check_connect_future"));
 
     @Override
@@ -116,7 +120,7 @@
                 }
 
                 willMessage = new WillMessage(payload.willTopic(), payload.willMessageInBytes(), variableHeader.isWillRetain(), variableHeader.willQos());
-                sessionLoop.addWillMessage(channel, willMessage);
+                willLoop.addWillMessage(channel, willMessage);
             }
 
         } catch (Exception e) {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
index 19ffac4..fb6ecf7 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
@@ -144,7 +144,6 @@
 
 
     private void sendRetainMessage(ChannelHandlerContext ctx, Set<Subscription> subscriptions) throws InterruptedException, RemotingException, org.apache.rocketmq.remoting.exception.RemotingException {
-
         String clientId = ChannelInfo.getClientId(ctx.channel());
         Session session = sessionLoop.getSession(ChannelInfo.getId(ctx.channel()));
         Set<Subscription> preciseTopics = new HashSet<>();
@@ -169,8 +168,7 @@
         }
 
         for (Subscription subscription : wildcardTopics) {
-
-            CompletableFuture<ArrayList<Message>> future = retainedPersistManager.getMsgsFromTrie(subscription);
+            CompletableFuture<ArrayList<Message>> future = retainedPersistManager.getMsgsFromTrie(subscription.toFirstTopic(), subscription.getTopicFilter());
             future.whenComplete((msgsList, throwable) -> {
                 for (Message msg : msgsList) {
                     if (msg == null) {
@@ -179,7 +177,6 @@
                     pushAction._sendMessage(session, clientId, subscription, msg);
                 }
             });
-
         }
     }
 
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
index 218b460..9d656f1 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
@@ -290,6 +290,9 @@
         if (!subscriptions.containsKey(subscription.getTopicFilter())) {
             return false;
         }
+        if (subscription.isShare()) {
+            return true;
+        }
         if (!sendingMessages.containsKey(subscription)) {
             sendingMessages.putIfAbsent(subscription, new ConcurrentHashMap<>(16));
         }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index 15d4ed2..70592a3 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -21,6 +21,8 @@
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
 import org.apache.rocketmq.mqtt.common.model.Message;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.Subscription;
@@ -56,6 +58,8 @@
     @Resource
     private ConnectConf connectConf;
 
+    @Resource
+    private LmqQueueStore lmqQueueStore;
 
     public void messageArrive(Session session, Subscription subscription, Queue queue) {
         if (session == null) {
@@ -163,6 +167,9 @@
             if (subscription.isRetry()) {
                 message.setRetry(message.getRetry() + 1);
                 logger.warn("retryPush:{},{},{}", session.getClientId(), message.getMsgId(), message.getRetry());
+            } else if (subscription.isShare()) {
+                String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(message.getOriginTopic(), "/","%");
+                lmqQueueStore.popAck(lmqTopic, subscription.getSharedName(), message);
             }
         });
     }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
index 89c6a71..771d09f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
@@ -111,7 +111,7 @@
         if (queue == null) {
             return;
         }
-        if (queue.isP2p() || queue.isRetry()) {
+        if (queue.isP2p() || queue.isRetry() || queue.isShare()) {
             return;
         }
         addLoadEvent(queue, pair.getRight());
@@ -188,6 +188,13 @@
             callbackResult(pullResult, callBackResult);
             return DONE;
         }
+
+        if (subscription.isShare()) {
+            CompletableFuture<PullResult> pullResult = lmqQueueStore.popMessage(subscription.getSharedName(), toFirstTopic(subscription), queue, count);
+            callbackResult(pullResult, callBackResult);
+            return DONE;
+        }
+
         CacheEntry cacheEntry = cache.getIfPresent(queue);
         if (cacheEntry == null) {
             StatUtil.addPv("NoPullCache", 1);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoop.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoop.java
index 3b969e8..65f995f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoop.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoop.java
@@ -21,7 +21,6 @@
 import io.netty.channel.Channel;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.Subscription;
-import org.apache.rocketmq.mqtt.common.model.WillMessage;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
 import org.apache.rocketmq.mqtt.cs.session.Session;
 
@@ -96,5 +95,4 @@
      */
     void notifyPullMessage(Session session, Subscription subscription, Queue queue);
 
-    void addWillMessage(Channel channel, WillMessage willMessage);
 }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
index 0c8bc17..ad62abc 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.mqtt.cs.session.loop;
 
-import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
 import org.apache.commons.lang3.StringUtils;
@@ -26,13 +25,10 @@
 import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
 import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
 import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
-import org.apache.rocketmq.mqtt.common.meta.IpUtil;
-import org.apache.rocketmq.mqtt.common.model.Constants;
 import org.apache.rocketmq.mqtt.common.model.PullResult;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.QueueOffset;
 import org.apache.rocketmq.mqtt.common.model.Subscription;
-import org.apache.rocketmq.mqtt.common.model.WillMessage;
 import org.apache.rocketmq.mqtt.common.util.SpringUtils;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
@@ -393,6 +389,7 @@
                 throw new RuntimeException(
                         "invalid notifyPullMessage, subscription is null, but queue is not null," + session.getClientId());
             }
+            logger.info("session loop impl doing notifyPullMessage queueFresh.freshQueue({}, {}})", session, subscription);
             queueFresh.freshQueue(session, subscription);
             pullMessage(session, subscription, queue);
             return;
@@ -411,32 +408,6 @@
         }
     }
 
-    @Override
-    public void addWillMessage(Channel channel, WillMessage willMessage) {
-        Session session = getSession(ChannelInfo.getId(channel));
-        String clientId = ChannelInfo.getClientId(channel);
-        String ip = IpUtil.getLocalAddressCompatible();
-
-        if (session == null) {
-            return;
-        }
-        if (willMessage == null) {
-            return;
-        }
-
-        String message = JSON.toJSONString(willMessage);
-        String willKey = ip + Constants.CTRL_1 + clientId;
-
-        // key: ip + clientId; value: WillMessage
-        willMsgPersistManager.put(willKey, message).whenComplete((result, throwable) -> {
-            if (!result || throwable != null) {
-                logger.error("fail to put will message key {} value {}", willKey, willMessage);
-                return;
-            }
-            logger.debug("put will message key {} value {} successfully", willKey, message);
-        });
-    }
-
     private String eventQueueKey(Session session, Queue queue) {
         StringBuilder sb = new StringBuilder();
         sb.append(ChannelInfo.getId(session.getChannel()));
@@ -540,7 +511,6 @@
                     }
                 } else if (PullResult.PULL_OFFSET_MOVED == pullResult.getCode()) {
                     queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
-                    queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
                     session.markPersistOffsetFlag(true);
                     pullMessage(session, subscription, queue);
                 } else {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
index e27bb47..a1dd337 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
@@ -18,17 +18,19 @@
 package org.apache.rocketmq.mqtt.cs.session.loop;
 
 import com.alibaba.fastjson.JSON;
+import io.netty.channel.Channel;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
-import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.facade.WillMsgSender;
 import org.apache.rocketmq.mqtt.common.meta.IpUtil;
 import org.apache.rocketmq.mqtt.common.model.Constants;
-import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
+import org.apache.rocketmq.mqtt.common.model.StoreResult;
 import org.apache.rocketmq.mqtt.common.model.WillMessage;
 import org.apache.rocketmq.mqtt.common.util.MessageUtil;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
+import org.apache.rocketmq.mqtt.cs.config.WillLoopConf;
 import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
-import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -42,6 +44,7 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @Component
 public class WillLoop {
@@ -49,6 +52,7 @@
     private ScheduledThreadPoolExecutor aliveService = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("check_alive_thread_"));
     private long checkAliveIntervalMillis = 5 * 1000;
     private ThreadPoolExecutor executor;
+    private WillLoopConf willLoopConf = new WillLoopConf();
 
     @Resource
     private WillMsgPersistManager willMsgPersistManager;
@@ -57,7 +61,11 @@
     private MqttMsgId mqttMsgId;
 
     @Resource
-    private PublishProcessor publishProcessor;
+    private WillMsgSender willMsgSender;
+
+    public void setWillLoopConf(WillLoopConf willLoopConf) {
+        this.willLoopConf = willLoopConf;
+    }
 
     @PostConstruct
     public void init() {
@@ -75,9 +83,12 @@
 
     private void csLoop() {
         try {
+            if (!willLoopConf.isEnableWill()) {
+                return;
+            }
             String ip = IpUtil.getLocalAddressCompatible();
-            String csKey = Constants.CS_ALIVE + Constants.CTRL_1 + ip;
-            String masterKey = Constants.CS_MASTER;
+            String csKey = wrapAliveCsKeyPrefix() + Constants.CTRL_1 + ip;
+            String masterKey = wrapMasterKey();
             long currentTime = System.currentTimeMillis();
 
             willMsgPersistManager.put(csKey, String.valueOf(currentTime)).whenComplete((result, throwable) -> {
@@ -103,6 +114,14 @@
         }
     }
 
+    protected String wrapMasterKey() {
+        return Constants.CS_MASTER;
+    }
+
+    protected String wrapAliveCsKeyPrefix() {
+        return Constants.CS_ALIVE;
+    }
+
     private boolean masterHasDown(String masterValue) {
         String[] ipTime = masterValue.split(Constants.COLON);
         if (ipTime.length < 2) {
@@ -115,13 +134,16 @@
 
     private void masterLoop() {
         try {
+            if (!willLoopConf.isEnableWill()) {
+                return;
+            }
             String ip = IpUtil.getLocalAddressCompatible();
             if (ip == null) {
                 logger.error("can not get local ip");
                 return;
             }
 
-            willMsgPersistManager.get(Constants.CS_MASTER).whenComplete((result, throwable) -> {
+            willMsgPersistManager.get(wrapMasterKey()).whenComplete((result, throwable) -> {
                 if (result == null || throwable != null) {
                     logger.error("fail to get CS_MASTER", throwable);
                     return;
@@ -139,16 +161,16 @@
                 }
                 // master keep alive
                 long currentTime = System.currentTimeMillis();
-                willMsgPersistManager.compareAndPut(Constants.CS_MASTER, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
+                willMsgPersistManager.compareAndPut(wrapMasterKey(), content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
                     if (!rs || tb != null) {
                         logger.error("{} fail to update master", ip, tb);
                     }
                 });
 
                 // master to check all cs state
-                String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
-                String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
-                willMsgPersistManager.scan(startCSKey, endCSKey).whenComplete((rs, tb) -> {
+                String startCSKey = wrapAliveCsKeyPrefix() + Constants.CTRL_0;
+                String endCSKey = wrapAliveCsKeyPrefix() + Constants.CTRL_2;
+                willMsgPersistManager.scan(startCSKey, endCSKey, willLoopConf.getMaxScanNodeNum()).whenComplete((rs, tb) -> {
                     if (rs == null || tb != null) {
                         logger.error("{} master fail to scan cs", ip, tb);
                         return;
@@ -162,10 +184,10 @@
                     for (Map.Entry<String, String> entry : rs.entrySet()) {
                         String key = entry.getKey();
                         String value = entry.getValue();
-                        logger.info("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
+                        logger.debug("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
                         if (System.currentTimeMillis() - Long.parseLong(value) > 10 * checkAliveIntervalMillis) {
                             // the cs has down
-                            String csIp = key.substring((Constants.CS_ALIVE + Constants.CTRL_1).length());
+                            String csIp = key.substring((wrapAliveCsKeyPrefix() + Constants.CTRL_1).length());
                             handleShutDownCS(csIp);
 
                             willMsgPersistManager.delete(key).whenComplete((resultDel, tbDel) -> {
@@ -185,53 +207,100 @@
     }
 
     private void handleShutDownCS(String ip) {
+        AtomicInteger loopNum = new AtomicInteger(
+                Math.max(1, willLoopConf.getMaxScanClientNum()) / willLoopConf.getScanNumOnce() + 1);
+        _handleShutDownCS(ip, loopNum);
+    }
+
+    private void _handleShutDownCS(String ip, AtomicInteger loopNum) {
+        if (loopNum.getAndDecrement() <= 0) {
+            return;
+        }
         String startClientKey = ip + Constants.CTRL_0;
         String endClientKey = ip + Constants.CTRL_2;
-        willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
-            if (willMap == null || throwable != null) {
-                logger.error("{} master fail to scan cs", ip, throwable);
-                return;
-            }
-
-            if (willMap.size() == 0) {
-                logger.info("the cs:{} has no will", ip);
-                return;
-            }
-
-            for (Map.Entry<String, String> entry : willMap.entrySet()) {
-                logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
-                String willKey = entry.getKey();
-                String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());
-
-                WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
-                int mqttId = mqttMsgId.nextId(clientId);
-                MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
-                        willMessage.getQos(), mqttId, willMessage.isRetain());
-                Runnable runnable = new Runnable() {
-                    @Override
-                    public void run() {
-                        CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(new MqttMessageUpContext(), mqttMessage);
-                        upstreamHookResult.whenComplete((hookResult, tb) -> {
-                            try {
-                                if (!hookResult.isSuccess()) {
-                                    executor.submit(this);
-                                } else {
-                                    willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
-                                        if (!resultDel || tbDel != null) {
-                                            logger.error("fail to delete will message key:{}", willKey);
-                                            return;
-                                        }
-                                        logger.info("delete will message key {} successfully", willKey);
-                                    });
-                                }
-                            } catch (Throwable t) {
-                                logger.error("", t);
-                            }
-                        });
+        willMsgPersistManager
+                .scan(startClientKey, endClientKey, willLoopConf.getScanNumOnce())
+                .whenComplete((willMap, throwable) -> {
+                    if (willMap == null || throwable != null) {
+                        logger.error("{} master fail to scan cs", ip, throwable);
+                        return;
                     }
-                };
-                executor.submit(runnable);
+                    for (Map.Entry<String, String> entry : willMap.entrySet()) {
+                        logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
+                        String willKey = entry.getKey();
+                        String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());
+                        WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
+                        sendWillMessage(willKey, clientId, willMessage);
+                    }
+                    if (willMap.size() >= willLoopConf.getScanNumOnce()) {
+                        _handleShutDownCS(ip, loopNum);
+                    }
+                });
+    }
+
+    public void closeConnect(Channel channel, String clientId, String reason) {
+        String ip = IpUtil.getLocalAddressCompatible();
+        String willKey = ip + Constants.CTRL_1 + clientId;
+        CompletableFuture<byte[]> willMessageFuture = willMsgPersistManager.get(willKey);
+        willMessageFuture.whenComplete((willMessageByte, throwable) -> {
+            String content = new String(willMessageByte);
+            if (Constants.NOT_FOUND.equals(content)) {
+                return;
             }
+
+            if (!"disconnect".equals(reason)) {
+                WillMessage willMessage = JSON.parseObject(content, WillMessage.class);
+                sendWillMessage(willKey, clientId, willMessage);
+            }
+        });
+    }
+
+    private void sendWillMessage(String willKey, String clientId, WillMessage willMessage) {
+        int mqttId = mqttMsgId.nextId(clientId);
+        MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
+                willMessage.getQos(), mqttId, willMessage.isRetain());
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                CompletableFuture<StoreResult> r = willMsgSender.sendWillMsg(clientId, mqttMessage);
+                r.whenComplete((hookResult, tb) -> {
+                    try {
+                        if (tb != null) {
+                            logger.error("sendWillMsg failed {},{}", clientId, willKey, tb);
+                        } else {
+                            willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
+                                if (!resultDel || tbDel != null) {
+                                    logger.error("fail to delete will message key:{}", willKey);
+                                    return;
+                                }
+                                logger.info("delete will message key {} successfully", willKey);
+                            });
+                        }
+                    } catch (Throwable t) {
+                        logger.error("", t);
+                    }
+                });
+            }
+        };
+        executor.submit(runnable);
+    }
+
+    public void addWillMessage(Channel channel, WillMessage willMessage) {
+        String clientId = ChannelInfo.getClientId(channel);
+        String ip = IpUtil.getLocalAddressCompatible();
+        if (willMessage == null) {
+            return;
+        }
+        String message = JSON.toJSONString(willMessage);
+        String willKey = ip + Constants.CTRL_1 + clientId;
+
+        // key: ip + clientId; value: WillMessage
+        willMsgPersistManager.put(willKey, message).whenComplete((result, throwable) -> {
+            if (!result || throwable != null) {
+                logger.error("fail to put will message key {} value {}", willKey, willMessage);
+                return;
+            }
+            logger.debug("put will message key {} value {} successfully", willKey, message);
         });
     }
 
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
index 7fa680f..b861fda 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
@@ -17,15 +17,13 @@
 
 package org.apache.rocketmq.mqtt.cs.starter;
 
-import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.mqtt.common.util.SpringUtils;
+
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
 
 public class Startup {
-
     public static void main(String[] args) {
-        System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
 
         ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring.xml");
         SpringUtils.SetClassPathXmlApplicationContext(applicationContext);
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/config/ServiceConf.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/config/ServiceConf.java
index 536bd87..337a3bc 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/config/ServiceConf.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/config/ServiceConf.java
@@ -22,6 +22,7 @@
 import org.springframework.core.io.ClassPathResource;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -43,8 +44,10 @@
     private String secretKey;
 
     private String metaAddr;
+    private long retainMsgExpire = 3 * 24 * 60 * 60 * 1000L;
 
-    public ServiceConf() throws IOException {
+    @PostConstruct
+    public void init() throws IOException {
         ClassPathResource classPathResource = new ClassPathResource(CONF_FILE_NAME);
         InputStream in = classPathResource.getInputStream();
         Properties properties = new Properties();
@@ -148,4 +151,11 @@
         this.secretKey = secretKey;
     }
 
+    public long getRetainMsgExpire() {
+        return retainMsgExpire;
+    }
+
+    public void setRetainMsgExpire(long retainMsgExpire) {
+        this.retainMsgExpire = retainMsgExpire;
+    }
 }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/FirstTopicManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/FirstTopicManager.java
index 79e7047..c976f1f 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/FirstTopicManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/FirstTopicManager.java
@@ -24,13 +24,13 @@
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
 import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
 import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
index 95368ce..9fe2695 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
@@ -20,12 +20,12 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
 import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
 import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaRpcClient.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaRpcClient.java
index 5814d44..e32e735 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaRpcClient.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaRpcClient.java
@@ -48,11 +48,11 @@
 @Component
 public class MetaRpcClient {
     private static Logger logger = LoggerFactory.getLogger(MetaRpcClient.class);
+    private static ScheduledExecutorService raftClientExecutor = Executors.newSingleThreadScheduledExecutor();
     private RouteTable rt;
     private Configuration conf;
     private CliClientServiceImpl cliClientService;
-    private static ScheduledExecutorService raftClientExecutor = Executors.newSingleThreadScheduledExecutor();
-    public String[] raftGroups;
+    private String[] raftGroups;
 
     @Resource
     private ServiceConf serviceConf;
@@ -68,8 +68,7 @@
         for (String groupId : raftGroups) {
             rt.updateConfiguration(groupId, conf);
         }
-        refreshLeader();
-        raftClientExecutor.scheduleAtFixedRate(() -> refreshLeader(), 3, 3, TimeUnit.SECONDS);
+        raftClientExecutor.scheduleAtFixedRate(() -> refreshLeader(), 1, 3, TimeUnit.SECONDS);
     }
 
     public void initRpcServer() {
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java
index eef7f82..833084c 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java
@@ -21,11 +21,15 @@
 import com.alipay.sofa.jraft.rpc.InvokeCallback;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.Unpooled;
+import io.netty.util.CharsetUtil;
 import org.apache.rocketmq.mqtt.common.model.Message;
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
 import org.apache.rocketmq.mqtt.common.model.consistency.Response;
 import org.apache.rocketmq.mqtt.common.model.consistency.StoreMessage;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.MessageUtil;
+import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,9 +42,17 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-import static org.apache.rocketmq.mqtt.common.meta.Constants.CATEGORY_RETAINED_MSG;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.NOT_FOUND;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.READ_INDEX_TYPE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.CATEGORY_RETAINED_MSG;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.NOT_FOUND;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.READ_INDEX_TYPE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_FIRST_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_OPERATION_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_OPERATION_TRIE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_IS_EMPTY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_EXPIRE;
 import static org.apache.rocketmq.mqtt.common.meta.RaftUtil.RETAIN_RAFT_GROUP_INDEX;
 
 
@@ -49,17 +61,24 @@
     private static Logger logger = LoggerFactory.getLogger(RetainedMsgClient.class);
 
     @Resource
-    private ServiceConf serviceConf;
-
-    @Resource
     private MetaRpcClient metaRpcClient;
 
+    @Resource
+    public ServiceConf serviceConf;
+
     public void setRetainedMsg(String topic, Message msg, CompletableFuture<Boolean> future) throws RemotingException, InterruptedException {
+        _setRetainedMsg(topic, msg, null, future);
+    }
+
+    public void _setRetainedMsg(String topic, Message msg, Long expire, CompletableFuture<Boolean> future) throws RemotingException, InterruptedException {
         String groupId = whichGroup();
         HashMap<String, String> option = new HashMap<>();
-        option.put("topic", topic);
-        option.put("firstTopic", msg.getFirstTopic());
-        option.put("isEmpty", String.valueOf(msg.isEmpty()));
+        option.put(RETAIN_REQ_WRITE_PARAM_TOPIC, topic);
+        option.put(RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC, msg.getFirstTopic());
+        option.put(RETAIN_REQ_WRITE_PARAM_IS_EMPTY, String.valueOf(msg.isEmpty()));
+        if (expire != null) {
+            option.put(RETAIN_REQ_WRITE_PARAM_EXPIRE, String.valueOf(expire));
+        }
 
         logger.debug("SetRetainedMsg option:" + option);
 
@@ -99,14 +118,14 @@
         String groupId = whichGroup();
         HashMap<String, String> option = new HashMap<>();
 
-        option.put("firstTopic", firstTopic);
-        option.put("topic", topic);
+        option.put(RETAIN_REQ_READ_PARAM_FIRST_TOPIC, firstTopic);
+        option.put(RETAIN_REQ_READ_PARAM_TOPIC, topic);
 
         logger.debug("GetRetainedMsgsFromTrie option:" + option);
 
         final ReadRequest request = ReadRequest.newBuilder()
                 .setGroup(groupId)
-                .setOperation("trie")
+                .setOperation(RETAIN_REQ_READ_PARAM_OPERATION_TRIE)
                 .setType(READ_INDEX_TYPE)
                 .putAllExtData(option)
                 .setCategory(CATEGORY_RETAINED_MSG)
@@ -126,7 +145,18 @@
                     ArrayList<Message> resultList = new ArrayList<>();
                     for (ByteString tmp : datalistList) {
                         try {
-                            resultList.add(Message.copyFromStoreMessage(StoreMessage.parseFrom(tmp.toByteArray())));
+                            Message message = Message.copyFromStoreMessage(StoreMessage.parseFrom(tmp.toByteArray()));
+                            if (System.currentTimeMillis() - message.getBornTimestamp() > serviceConf.getRetainMsgExpire()) {
+                                message.setPayload(Unpooled.copiedBuffer(MessageUtil.EMPTYSTRING, CharsetUtil.UTF_8).array());
+                                message.setEmpty(true);
+                                try {
+                                    _setRetainedMsg(TopicUtils.normalizeTopic(message.getOriginTopic()), message, serviceConf.getRetainMsgExpire(), new CompletableFuture<>());
+                                } catch (Exception e) {
+                                    logger.error("", e);
+                                }
+                                continue;
+                            }
+                            resultList.add(message);
                         } catch (InvalidProtocolBufferException e) {
                             future.complete(null);
                             throw new RuntimeException(e);
@@ -149,11 +179,11 @@
     public void GetRetainedMsg(String topic, CompletableFuture<Message> future) throws RemotingException, InterruptedException {
         String groupId = whichGroup();
         HashMap<String, String> option = new HashMap<>();
-        option.put("topic", topic);
+        option.put(RETAIN_REQ_READ_PARAM_TOPIC, topic);
 
         final ReadRequest request = ReadRequest.newBuilder()
                 .setGroup(groupId)
-                .setOperation("topic")
+                .setOperation(RETAIN_REQ_READ_PARAM_OPERATION_TOPIC)
                 .setType(READ_INDEX_TYPE)
                 .putAllExtData(option)
                 .setCategory(CATEGORY_RETAINED_MSG)
@@ -177,7 +207,17 @@
                     Message message = null;
                     try {
                         message = Message.copyFromStoreMessage(StoreMessage.parseFrom(rsp.getData().toByteArray()));
-                    } catch (InvalidProtocolBufferException e) {
+                        if (System.currentTimeMillis() - message.getBornTimestamp() > serviceConf.getRetainMsgExpire()) {
+                            message.setPayload(Unpooled.copiedBuffer(MessageUtil.EMPTYSTRING, CharsetUtil.UTF_8).array());
+                            message.setEmpty(true);
+                            try {
+                                _setRetainedMsg(topic, message, serviceConf.getRetainMsgExpire(), new CompletableFuture<>());
+                            } catch (Exception e) {
+                                logger.error("", e);
+                            }
+                            message = null;
+                        }
+                    } catch (Exception e) {
                         future.complete(null);
                         throw new RuntimeException(e);
                     }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java
index a2e0901..87c797d 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java
@@ -20,19 +20,14 @@
 
 
 import com.alipay.sofa.jraft.error.RemotingException;
-
 import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
 import org.apache.rocketmq.mqtt.common.facade.RetainedPersistManager;
 import org.apache.rocketmq.mqtt.common.model.Message;
-import org.apache.rocketmq.mqtt.common.model.Subscription;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Resource;
 import java.util.ArrayList;
-
-
 import java.util.concurrent.CompletableFuture;
 
 
@@ -46,6 +41,7 @@
     @Resource
     private RetainedMsgClient retainedMsgClient;
 
+
     public void init() {
     }
 
@@ -81,11 +77,8 @@
         return future;
     }
 
-    public CompletableFuture<ArrayList<Message>> getMsgsFromTrie(Subscription subscription) {
-        String firstTopic = subscription.toFirstTopic();
-        String originTopicFilter = subscription.getTopicFilter();
+    public CompletableFuture<ArrayList<Message>> getMsgsFromTrie(String firstTopic, String originTopicFilter) {
         logger.debug("firstTopic={} originTopicFilter={}", firstTopic, originTopicFilter);
-
         CompletableFuture<ArrayList<Message>> future = new CompletableFuture<>();
         try {
             retainedMsgClient.GetRetainedMsgsFromTrie(firstTopic, originTopicFilter, future);
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgClient.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgClient.java
index 207b461..7b0412e 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgClient.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgClient.java
@@ -29,8 +29,17 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.rocketmq.mqtt.common.meta.Constants.CATEGORY_WILL_MSG;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.READ_INDEX_TYPE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.CATEGORY_WILL_MSG;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.READ_INDEX_TYPE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_END_KEY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_GET;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_SCAN;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_SCAN_NUM;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_START_KEY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_COMPARE_AND_PUT;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_DELETE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_EXPECT_VALUE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_PUT;
 import static org.apache.rocketmq.mqtt.common.meta.RaftUtil.WILL_RAFT_GROUP_INDEX;
 
 
@@ -48,7 +57,7 @@
                 setGroup(groupId).
                 setKey(key).
                 setData(ByteString.copyFrom(value.getBytes())).
-                setOperation("put").
+                setOperation(WILL_REQ_WRITE_PUT).
                 setCategory(CATEGORY_WILL_MSG).
                 build();
 
@@ -74,7 +83,7 @@
         final WriteRequest request = WriteRequest.newBuilder().
                 setGroup(groupId).
                 setKey(key).
-                setOperation("delete").
+                setOperation(WILL_REQ_WRITE_DELETE).
                 setCategory(CATEGORY_WILL_MSG).
                 build();
 
@@ -100,7 +109,7 @@
         final ReadRequest request = ReadRequest.newBuilder().
                 setGroup(groupId).
                 setKey(key).
-                setOperation("get").
+                setOperation(WILL_REQ_READ_GET).
                 setType(READ_INDEX_TYPE).
                 setCategory(CATEGORY_WILL_MSG).
                 build();
@@ -127,8 +136,8 @@
                 setGroup(groupId).
                 setKey(key).
                 setData(ByteString.copyFrom(updateValue.getBytes())).
-                setOperation("compareAndPut").
-                putExtData("expectValue", expectValue).
+                setOperation(WILL_REQ_WRITE_COMPARE_AND_PUT).
+                putExtData(WILL_REQ_WRITE_EXPECT_VALUE, expectValue).
                 setCategory(CATEGORY_WILL_MSG).
                 build();
 
@@ -149,13 +158,14 @@
         }, 5000);
     }
 
-    public void scan(final String startKey, final String endKey, CompletableFuture<Map<String, String>> future) throws Exception {
+    public void scan(final String startKey, final String endKey, int scanNum, CompletableFuture<Map<String, String>> future) throws Exception {
         String groupId = whichGroup();
         final ReadRequest request = ReadRequest.newBuilder().
                 setGroup(groupId).
-                setOperation("scan").
-                putExtData("startKey", startKey).
-                putExtData("endKey", endKey).
+                setOperation(WILL_REQ_READ_SCAN).
+                putExtData(WILL_REQ_READ_START_KEY, startKey).
+                putExtData(WILL_REQ_READ_END_KEY, endKey).
+                putExtData(WILL_REQ_READ_SCAN_NUM, String.valueOf(scanNum)).
                 setType(READ_INDEX_TYPE).
                 setCategory(CATEGORY_WILL_MSG).
                 build();
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgPersistManagerImpl.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgPersistManagerImpl.java
index 4cb5c03..504ed8b 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgPersistManagerImpl.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgPersistManagerImpl.java
@@ -91,10 +91,10 @@
     }
 
     @Override
-    public CompletableFuture<Map<String, String>> scan(String startKey, String endKey) {
+    public CompletableFuture<Map<String, String>> scan(String startKey, String endKey, int scanNum) {
         CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
         try {
-            willMsgClient.scan(startKey, endKey, future);
+            willMsgClient.scan(startKey, endKey, scanNum, future);
             return future;
         } catch (Exception e) {
             future.completeExceptionally(e);
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index a6327b8..e027fa5 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -117,6 +117,8 @@
 
     private void refresh() throws MQClientException {
         Set<String> tmp = metaPersistManager.getAllFirstTopics();
+        logger.info("Notify Manager is refreshing, all first topic is " + tmp);
+
         if (tmp == null || tmp.isEmpty()) {
             return;
         }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
index e9f87e6..094150c 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
@@ -22,9 +22,6 @@
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
-import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.QueueOffset;
@@ -32,6 +29,9 @@
 import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
 import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
 import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 7511fdd..49fe87e 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -20,7 +20,12 @@
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -34,13 +39,12 @@
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
 import org.apache.rocketmq.mqtt.common.model.Constants;
@@ -58,6 +62,11 @@
 import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
 import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -70,12 +79,14 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @Component
 public class LmqQueueStoreManager implements LmqQueueStore {
     private static Logger logger = LoggerFactory.getLogger(LmqQueueStoreManager.class);
     private PullAPIWrapper pullAPIWrapper;
+    private MQClientInstance mQClientFactory;
     private DefaultMQPullConsumer defaultMQPullConsumer;
     private DefaultMQProducer defaultMQProducer;
     private String consumerGroup = MixAll.CID_RMQ_SYS_PREFIX + "LMQ_PULL";
@@ -93,6 +104,7 @@
         defaultMQPullConsumer.setConsumerPullTimeoutMillis(2000);
         defaultMQPullConsumer.start();
         pullAPIWrapper = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
+        mQClientFactory = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory();
 
         defaultMQProducer = MqFactory.buildDefaultMQProducer("GID_LMQ_SEND", serviceConf.getProperties());
         defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
@@ -155,6 +167,11 @@
                             new TypeReference<Map<String, String>>() {
                             }));
         }
+
+        if (StringUtils.isNotBlank(mqMessage.getProperty(MessageConst.PROPERTY_POP_CK))) {
+            message.getUserProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, mqMessage.getProperty(MessageConst.PROPERTY_POP_CK));
+        }
+
         return message;
     }
 
@@ -445,7 +462,8 @@
 
         if (brokerAddr != null) {
             try {
-                return mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, lmqTopic, (int) queue.getQueueId(), 3000L);
+                MessageQueue messageQueue = new MessageQueue(lmqTopic, queue.getBrokerName(),  (int) queue.getQueueId());
+                return mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, messageQueue, 3000L);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
             }
@@ -453,4 +471,169 @@
 
         throw new MQClientException("The broker[" + queue.getBrokerName() + "] not exist", null);
     }
+
+    @Override
+    public CompletableFuture<PullResult> popMessage(String consumerGroup, String firstTopic, Queue queue, long count) {
+        CompletableFuture<PullResult> result = new CompletableFuture<>();
+        long start = System.currentTimeMillis();
+        PopCallback popCallback = new PopCallback() {
+            @Override
+            public void onSuccess(PopResult popResult) {
+                if (popResult == null) {
+                    logger.warn("PopResult is null, just wait retry");
+                    return;
+                }
+
+                PullResult lmqPullResult = new PullResult();
+                if (PopStatus.FOUND == popResult.getPopStatus()) {
+                    lmqPullResult.setCode(PullResult.PULL_SUCCESS);
+                    List<MessageExt> messageExtList = popResult.getMsgFoundList();
+                    if (messageExtList != null && !messageExtList.isEmpty()) {
+                        List<Message> messageList = messageExtList.stream()
+                            .map(messageExt -> toLmqMessage(queue, messageExt))
+                            .collect(Collectors.toList());
+                        lmqPullResult.setMessageList(messageList);
+                    }
+                } else {
+                    lmqPullResult.setCode(PullResult.NO_NEW_MSG);
+                }
+
+                result.complete(lmqPullResult);
+
+                long rt = System.currentTimeMillis() - start;
+                StatUtil.addInvoke("lmqPop", rt);
+                collectLmqReadWriteMatchActionRt("lmqPop", rt, true);
+                StatUtil.addPv(popResult.getPopStatus().name(), 1);
+                try {
+                    MqttMetricsCollector.collectPullStatusTps(1, popResult.getPopStatus().name());
+                } catch (Throwable e) {
+                    logger.error("collect prometheus error", e);
+                }
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                logger.error("pop message from {} error", queue, e);
+                result.completeExceptionally(e);
+                long rt = System.currentTimeMillis() - start;
+                StatUtil.addInvoke("lmqPop", rt, false);
+                collectLmqReadWriteMatchActionRt("lmqPop", rt, false);
+            }
+        };
+
+        try {
+            String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
+            MessageQueue firstTopicQueue = new MessageQueue(firstTopic, queue.getBrokerName(), (int) queue.getQueueId());
+            popKernelImpl(lmqTopic, firstTopicQueue, consumerGroup, ExpressionType.TAG, "*",
+                60000, count, ConsumeInitMode.MAX, false, 15000, 15000, popCallback);
+        } catch (Throwable e) {
+            result.completeExceptionally(e);
+        }
+
+        return result;
+    }
+
+    public void popKernelImpl(
+        final String lmqTopic,
+        final MessageQueue mq,
+        final String consumerGroup,
+        final String expressionType,
+        final String subExpression,
+        final long invisibleTime,
+        final long maxNums,
+        final int initMode,
+        final boolean order,
+        final long pollTime,
+        final long timeoutMillis,
+        final PopCallback popCallback
+    ) throws RemotingException, InterruptedException, MQClientException {
+        FindBrokerResult findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
+        if (null == findBrokerResult) {
+            mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
+            if (null == findBrokerResult) {
+                throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+            }
+        }
+
+        PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setTopic(lmqTopic);
+        requestHeader.setQueueId(mq.getQueueId());
+        requestHeader.setMaxMsgNums((int) maxNums);
+        requestHeader.setInvisibleTime(invisibleTime);
+        requestHeader.setInitMode(initMode);
+        requestHeader.setExpType(expressionType);
+        requestHeader.setExp(subExpression);
+        requestHeader.setOrder(order);
+        requestHeader.setPollTime(pollTime);
+        requestHeader.setBornTime(System.currentTimeMillis());
+
+        mQClientFactory.getMQClientAPIImpl().popMessageAsync(mq.getBrokerName(), findBrokerResult.getBrokerAddr(),
+            requestHeader, timeoutMillis, popCallback);
+    }
+
+    public void popAck(String lmqTopic, String consumerGroup, Message message) {
+        long start = System.currentTimeMillis();
+        AckCallback ackCallback = new AckCallback() {
+            @Override
+            public void onSuccess(AckResult ackResult) {
+                long rt = System.currentTimeMillis() - start;
+                StatUtil.addInvoke("lmqPopAck", rt);
+                collectLmqReadWriteMatchActionRt("lmqPopAck", rt, true);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(10);
+                    popAckKernelImpl(lmqTopic, consumerGroup, message, 15000, this);
+                } catch (Exception ignore) {
+                    logger.warn("popAck {} message error", lmqTopic, e);
+                }
+                long rt = System.currentTimeMillis() - start;
+                StatUtil.addInvoke("lmqPopAck", rt, false);
+                collectLmqReadWriteMatchActionRt("lmqPopAck", rt, false);
+            }
+        };
+
+        try {
+            popAckKernelImpl(lmqTopic, consumerGroup, message, 15000, ackCallback);
+        } catch (Exception e) {
+            logger.error("pop ack error", e);
+        }
+    }
+
+    public void popAckKernelImpl(
+        final String lmqTopic,
+        final String consumerGroup,
+        final Message message,
+        final long timeoutMillis,
+        final AckCallback ackCallback
+    ) throws RemotingException, MQBrokerException, MQClientException, InterruptedException {
+        String extraInfo = message.getUserProperty(MessageConst.PROPERTY_POP_CK);
+        String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+
+        String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
+        FindBrokerResult findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
+        if (null == findBrokerResult) {
+            mQClientFactory.updateTopicRouteInfoFromNameServer(message.getFirstTopic());
+            findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
+            if (null == findBrokerResult) {
+                throw new MQClientException("The broker[" + brokerName + "] not exist", null);
+            }
+        }
+
+        int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+        long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs);
+
+        AckMessageRequestHeader ackMessageRequestHeader = new AckMessageRequestHeader();
+        ackMessageRequestHeader.setTopic(lmqTopic);
+        ackMessageRequestHeader.setQueueId(queueId);
+        ackMessageRequestHeader.setOffset(queueOffset);
+        ackMessageRequestHeader.setConsumerGroup(consumerGroup);
+        ackMessageRequestHeader.setExtraInfo(extraInfo);
+
+        mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(), timeoutMillis, ackCallback, ackMessageRequestHeader);
+    }
 }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
index 0dec776..8de2726 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
@@ -25,6 +25,7 @@
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
 import org.apache.rocketmq.mqtt.common.facade.RetainedPersistManager;
+import org.apache.rocketmq.mqtt.common.facade.WillMsgSender;
 import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.common.model.Message;
 import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
@@ -38,13 +39,14 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+
 import javax.annotation.Resource;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 
 @Component
-public class PublishProcessor implements UpstreamProcessor {
+public class PublishProcessor implements UpstreamProcessor, WillMsgSender {
     private static Logger logger = LoggerFactory.getLogger(PublishProcessor.class);
     @Resource
     private LmqQueueStore lmqQueueStore;
@@ -60,6 +62,12 @@
 
     @Override
     public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage mqttMessage) {
+        CompletableFuture<StoreResult> r = put(context, mqttMessage);
+        return r.thenCompose(storeResult -> HookResult.newHookResult(HookResult.SUCCESS, null,
+                JSON.toJSONBytes(storeResult)));
+    }
+
+    public CompletableFuture<StoreResult> put(MqttMessageUpContext context, MqttMessage mqttMessage) {
         MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
         boolean isEmpty = false;
         //deal empty payload
@@ -98,9 +106,14 @@
         message.setBornTimestamp(bornTime);
         message.setEmpty(isEmpty);
 
-        CompletableFuture<StoreResult> storeResultFuture = lmqQueueStore.putMessage(queueNames, message);
-        return storeResultFuture.thenCompose(storeResult -> HookResult.newHookResult(HookResult.SUCCESS, null,
-            JSON.toJSONBytes(storeResult)));
+        return lmqQueueStore.putMessage(queueNames, message);
     }
 
+
+    @Override
+    public CompletableFuture<StoreResult> sendWillMsg(String clientId, MqttPublishMessage message) {
+        MqttMessageUpContext ctx = new MqttMessageUpContext();
+        ctx.setClientId(clientId);
+        return put(ctx, message);
+    }
 }
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestFirstTopicManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestFirstTopicManager.java
index d638e97..db0011f 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestFirstTopicManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestFirstTopicManager.java
@@ -25,14 +25,14 @@
 import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
 import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
 import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/WillMsgPersistManagerImplTest.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/WillMsgPersistManagerImplTest.java
index 2e1f3a8..038b4d4 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/WillMsgPersistManagerImplTest.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/WillMsgPersistManagerImplTest.java
@@ -110,7 +110,7 @@
         String ip = "xxxx";
         String startClientKey = ip + Constants.CTRL_0;
         String endClientKey = ip + Constants.CTRL_2;
-        willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
+        willMsgPersistManager.scan(startClientKey, endClientKey, 100).whenComplete((willMap, throwable) -> {
             if (willMap == null || throwable != null) {
                 return;
             }
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
index a82c593..5ec114e 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
@@ -37,6 +37,7 @@
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.mqtt.common.model.Constants;
 import org.apache.rocketmq.mqtt.common.model.Message;
 import org.apache.rocketmq.mqtt.common.model.MessageEvent;
@@ -201,7 +202,7 @@
         when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPI);
         when(mqClientInstance.findBrokerAddressInPublish(anyString())).thenReturn(
                 String.valueOf(new FindBrokerResult("test", false)));
-        when(mqClientAPI.getMaxOffset(anyString(), anyString(), anyInt(), anyLong())).thenReturn(maxOffset);
+        when(mqClientAPI.getMaxOffset(anyString(), any(MessageQueue.class), anyLong())).thenReturn(maxOffset);
 
         CompletableFuture<Long> queryOffsetFuture = lmqQueueStoreManager.queryQueueMaxOffset(queue);
         verify(mqClientInstance, times(0)).updateTopicRouteInfoFromNameServer(any());
diff --git a/mqtt-exporter/pom.xml b/mqtt-exporter/pom.xml
index 708959a..12dc000 100644
--- a/mqtt-exporter/pom.xml
+++ b/mqtt-exporter/pom.xml
@@ -27,6 +27,10 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
index 60a0a1e..201bb06 100644
--- a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
@@ -111,6 +111,10 @@
     }
 
     private static void collect(MqttMetricsInfo mqttMetricsInfo, long val, String... labels) throws PrometheusException {
+        if (!initialized) {
+            return;
+        }
+
         Map<MqttMetricsInfo, Collector> mqttMetricsInfoCollectorTypeMap = ALL_TYPE_COLLECTORS.get(mqttMetricsInfo.getType());
         if (mqttMetricsInfoCollectorTypeMap == null) {
             throw new PrometheusException("mqttMetricsInfo unregistered or collector type not support: " + mqttMetricsInfo);
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
index 376ab63..3b7c793 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
@@ -23,47 +23,46 @@
 import java.util.Properties;
 
 import org.apache.rocketmq.common.MixAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.core.io.ClassPathResource;
 import org.springframework.stereotype.Component;
 
 @Component
 public class MetaConf {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MetaConf.class);
     private static final String CONF_FILE_NAME = "meta.conf";
     private File confFile;
-    private String clusterName = "defaultCluster";
     private String allNodeAddress;
-    private String dbPath = System.getProperty("user.home") + "/mqtt_meta/db";
-    private String raftDataPath = System.getProperty("user.home") + "/mqtt_meta/raft";
     private int metaPort = 25000;
     private String selfAddress;
+    private int raftNodePort = 8081;
     private String membersAddress;
-    private int maxRetainedTopicNum =  10000;
+    private int maxRetainedTopicNum = 10000;
     private int electionTimeoutMs = 1000;
     private int snapshotIntervalSecs = 60 * 1000;
     private String raftServiceName = System.getenv("RaftServiceName");
 
+    private int scanNum = 10000;
+
     public MetaConf() throws IOException {
-        ClassPathResource classPathResource = new ClassPathResource(CONF_FILE_NAME);
-        InputStream in = classPathResource.getInputStream();
-        Properties properties = new Properties();
-        properties.load(in);
-        in.close();
-        MixAll.properties2Object(properties, this);
-        this.confFile = new File(classPathResource.getURL().getFile());
+        try {
+            ClassPathResource classPathResource = new ClassPathResource(CONF_FILE_NAME);
+            InputStream in = classPathResource.getInputStream();
+            Properties properties = new Properties();
+            properties.load(in);
+            in.close();
+            MixAll.properties2Object(properties, this);
+            this.confFile = new File(classPathResource.getURL().getFile());
+        } catch (Exception e) {
+            LOGGER.error("", e);
+        }
     }
 
     public File getConfFile() {
         return confFile;
     }
 
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
     public String getAllNodeAddress() {
         return allNodeAddress;
     }
@@ -72,22 +71,6 @@
         this.allNodeAddress = allNodeAddress;
     }
 
-    public String getDbPath() {
-        return dbPath;
-    }
-
-    public void setDbPath(String dbPath) {
-        this.dbPath = dbPath;
-    }
-
-    public String getRaftDataPath() {
-        return raftDataPath;
-    }
-
-    public void setRaftDataPath(String raftDataPath) {
-        this.raftDataPath = raftDataPath;
-    }
-
     public int getMetaPort() {
         return metaPort;
     }
@@ -143,4 +126,20 @@
     public void setRaftServiceName(String raftServiceName) {
         this.raftServiceName = raftServiceName;
     }
+
+    public int getRaftNodePort() {
+        return raftNodePort;
+    }
+
+    public void setRaftNodePort(int raftNodePort) {
+        this.raftNodePort = raftNodePort;
+    }
+
+    public int getScanNum() {
+        return scanNum;
+    }
+
+    public void setScanNum(int scanNum) {
+        this.scanNum = scanNum;
+    }
 }
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConfListener.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConfListener.java
index 0282824..19cbd32 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConfListener.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConfListener.java
@@ -48,6 +48,9 @@
     @PostConstruct
     public void start() {
         confFile = metaConf.getConfFile();
+        if (confFile == null) {
+            return;
+        }
         gmt.set(confFile.lastModified());
         scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("ConnectConfListener"));
         scheduler.scheduleWithFixedDelay(() -> {
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttApplyListener.java
similarity index 70%
rename from mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java
rename to mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttApplyListener.java
index e9635bf..16e522b 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttApplyListener.java
@@ -15,14 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.mqtt.common.meta;
+package org.apache.rocketmq.mqtt.meta.raft;
 
-public class Constants {
+import com.google.protobuf.Message;
+import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
 
-    public static final String CATEGORY_RETAINED_MSG = "retainedMsg";
-    public static final String CATEGORY_WILL_MSG = "willMsg";
+public interface MqttApplyListener {
 
-    public static final String NOT_FOUND = "NOT_FOUND";
-
-    public static final String READ_INDEX_TYPE = "readIndexType";
+    /**
+     * never to block
+     *
+     * @param message
+     * @param rocksDBEngine
+     */
+    void onApply(Message message, RocksDBEngine rocksDBEngine);
 }
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
index 5c1b428..5c376c3 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
@@ -46,7 +46,9 @@
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
 import org.apache.rocketmq.mqtt.common.model.consistency.Response;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.HostInfo;
 import org.apache.rocketmq.mqtt.meta.config.MetaConf;
+import org.apache.rocketmq.mqtt.meta.raft.processor.HashKvStateProcessor;
 import org.apache.rocketmq.mqtt.meta.raft.processor.RetainedMsgStateProcessor;
 import org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor;
 import org.apache.rocketmq.mqtt.meta.raft.processor.WillMsgStateProcessor;
@@ -93,6 +95,21 @@
     private Map<String, MqttStateMachine> bizStateMachineMap = new ConcurrentHashMap<>();
     public String[] raftGroups;
     private RouteTable rt;
+    private HostInfo hostInfo = HostInfo.getInstall();
+
+    private MqttApplyListener mqttApplyListener;
+
+    public MqttApplyListener getMqttApplyListener() {
+        return mqttApplyListener;
+    }
+
+    public void setMqttApplyListener(MqttApplyListener mqttApplyListener) {
+        this.mqttApplyListener = mqttApplyListener;
+    }
+
+    public MetaConf getMetaConf() {
+        return metaConf;
+    }
 
     @PostConstruct
     void init() throws IOException, RocksDBException {
@@ -111,11 +128,20 @@
                 new LinkedBlockingQueue<>(10000),
                 new ThreadFactoryImpl("requestExecutor_"));
 
-        registerStateProcessor(new RetainedMsgStateProcessor(this, metaConf.getMaxRetainedTopicNum()));  //add retained msg processor
+        registerStateProcessor(new RetainedMsgStateProcessor(this));  //add retained msg processor
         registerStateProcessor(new WillMsgStateProcessor(this));
+        registerStateProcessor(new HashKvStateProcessor(this));
 
         rt = RouteTable.getInstance();
-        localPeerId = PeerId.parsePeer(metaConf.getSelfAddress());
+        if (StringUtils.isNotBlank(metaConf.getSelfAddress())) {
+            localPeerId = PeerId.parsePeer(metaConf.getSelfAddress());
+        } else {
+            String localDomain = hostInfo.getAddress();
+            if (StringUtils.isNotBlank(metaConf.getRaftServiceName())) {
+                localDomain = hostInfo.getName() + "." + metaConf.getRaftServiceName();
+            }
+            this.localPeerId = new PeerId(localDomain, metaConf.getRaftNodePort());
+        }
         rpcServer = createRpcServer(this, localPeerId);
         NodeManager.getInstance().addAddress(localPeerId.getEndpoint());
         if (!rpcServer.init(null)) {
@@ -129,7 +155,7 @@
             FileUtils.forceMkdir(new File(rdbPath));
             RocksDBEngine rocksDBEngine = new RocksDBEngine(rdbPath);
             rocksDBEngine.init();
-            MqttStateMachine sm = new MqttStateMachine(this);
+            MqttStateMachine sm = new MqttStateMachine(this, group);
             sm.setRocksDBEngine(rocksDBEngine);
             createRaftNode(group, sm);
         }
@@ -162,7 +188,7 @@
         nodeOptions.setSnapshotIntervalSecs(metaConf.getSnapshotIntervalSecs());
 
         final Configuration initConf = new Configuration();
-        String initConfStr = metaConf.getMembersAddress();
+        String initConfStr = initConfStr();
         if (!initConf.parse(initConfStr)) {
             throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
         }
@@ -180,6 +206,21 @@
         return node;
     }
 
+    public String initConfStr() {
+        if (StringUtils.isNotBlank(metaConf.getMembersAddress())) {
+            return metaConf.getMembersAddress();
+        }
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 3; i++) {
+            sb.append(metaConf.getRaftServiceName() + "-" + i);
+            sb.append("." + metaConf.getRaftServiceName());
+            sb.append(":" + metaConf.getRaftNodePort());
+            sb.append(",");
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        return sb.toString();
+    }
+
     private void registerBizStateMachine(String groupId, MqttStateMachine sm) {
         MqttStateMachine prv = bizStateMachineMap.putIfAbsent(groupId, sm);
         if (prv != null) {
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
index b763a29..8422065 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
@@ -29,6 +29,7 @@
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
 import org.apache.rocketmq.mqtt.common.model.consistency.Response;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
 import org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor;
 import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
 import org.slf4j.Logger;
@@ -43,64 +44,58 @@
     protected Node node;
     protected RocksDBEngine rocksDBEngine;
     protected final MqttRaftServer server;
+    private String group;
 
-    public MqttStateMachine(MqttRaftServer server) {
+    public MqttStateMachine(MqttRaftServer server, String group) {
         this.server = server;
+        this.group = group;
     }
 
     @Override
     public void onApply(Iterator iterator) {
-        int index = 0;
-        int applied = 0;
         Message message;
         MqttClosure closure = null;
-        try {
-            while (iterator.hasNext()) {
-                Status status = Status.OK();
-                try {
-                    if (iterator.done() != null) {
-                        closure = (MqttClosure) iterator.done();
-                        message = closure.getMessage();
-                    } else {
-                        final ByteBuffer data = iterator.getData();
-                        message = parseMessage(data.array());
+        while (iterator.hasNext()) {
+            long start = System.currentTimeMillis();
+            Status status = Status.OK();
+            try {
+                if (iterator.done() != null) {
+                    closure = (MqttClosure) iterator.done();
+                    message = closure.getMessage();
+                } else {
+                    final ByteBuffer data = iterator.getData();
+                    message = parseMessage(data.array());
+                }
+                if (message instanceof WriteRequest) {
+                    WriteRequest writeRequest = (WriteRequest) message;
+                    StateProcessor processor = server.getProcessor(writeRequest.getCategory());
+                    Response response = processor.onWriteRequest((WriteRequest) message);
+                    if (Objects.nonNull(closure)) {
+                        closure.setResponse(response);
                     }
-
-                    LOGGER.debug("get message:{} and apply to state machine", message);
-
-                    if (message instanceof WriteRequest) {
-                        WriteRequest writeRequest = (WriteRequest) message;
-                        StateProcessor processor = server.getProcessor(writeRequest.getCategory());
-                        Response response = processor.onWriteRequest((WriteRequest) message);
-                        if (Objects.nonNull(closure)) {
-                            closure.setResponse(response);
-                        }
-                    }
-
-                    if (message instanceof ReadRequest) {
-                        ReadRequest request = (ReadRequest) message;
-                        StateProcessor processor = server.getProcessor(request.getCategory());
-                        Response response = processor.onReadRequest((ReadRequest) message);
-                        if (Objects.nonNull(closure)) {
-                            closure.setResponse(response);
-                        }
-                    }
-                } catch (Throwable e) {
-                    index++;
-                    status.setError(RaftError.UNKNOWN, e.toString());
-                    Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
-                    throw e;
-                } finally {
-                    Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
                 }
 
-                applied++;
-                index++;
-                iterator.next();
+                if (message instanceof ReadRequest) {
+                    ReadRequest request = (ReadRequest) message;
+                    StateProcessor processor = server.getProcessor(request.getCategory());
+                    Response response = processor.onReadRequest((ReadRequest) message);
+                    if (Objects.nonNull(closure)) {
+                        closure.setResponse(response);
+                    }
+                }
+                MqttApplyListener applyListener = server.getMqttApplyListener();
+                if (applyListener != null) {
+                    applyListener.onApply(message, rocksDBEngine);
+                }
+            } catch (Throwable e) {
+                LOGGER.error("stateMachine meet critical error", e);
+                status.setError(RaftError.UNKNOWN, e.toString());
+                Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
+            } finally {
+                Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
+                StatUtil.addInvoke(StatUtil.buildKey(group, "apply"), System.currentTimeMillis() - start);
             }
-        } catch (Throwable t) {
-            LOGGER.error("stateMachine meet critical error", t);
-            //iterator.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.", t.toString()));
+            iterator.next();
         }
     }
 
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/HashKvStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/HashKvStateProcessor.java
new file mode 100644
index 0000000..935b646
--- /dev/null
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/HashKvStateProcessor.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.meta.raft.processor;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.google.protobuf.ByteString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
+import org.apache.rocketmq.mqtt.common.model.consistency.Response;
+import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
+import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
+import org.apache.rocketmq.mqtt.meta.raft.MqttStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.CATEGORY_HASH_KV;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_HASH_KV_FIELD;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_DEL;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_DEL_HASH;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_GET;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_GET_HASH;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_PUT;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_PUT_HASH;
+
+public class HashKvStateProcessor extends StateProcessor {
+    private static final Logger LOGGER = LoggerFactory.getLogger(HashKvStateProcessor.class);
+    private MqttRaftServer server;
+
+    public HashKvStateProcessor(MqttRaftServer server) {
+        this.server = server;
+    }
+
+    @Override
+    public Response onReadRequest(ReadRequest request) throws Exception {
+        long start = System.currentTimeMillis();
+        try {
+            MqttStateMachine sm = server.getMqttStateMachine(request.getGroup());
+            if (sm == null) {
+                LOGGER.error("Fail to process will ReadRequest , Not Found SM for {}", request.getGroup());
+                return null;
+            }
+            String operation = request.getOperation();
+            String key = request.getKey();
+            if (OP_KV_GET.equals(operation)) {
+                byte[] value = getRdb(sm.getRocksDBEngine(), key.getBytes(StandardCharsets.UTF_8));
+                if (value == null) {
+                    return Response.newBuilder()
+                            .setSuccess(true)
+                            .build();
+                }
+                return Response.newBuilder()
+                        .setSuccess(true)
+                        .setData(ByteString.copyFrom(value))
+                        .build();
+            } else if (OP_KV_GET_HASH.equals(operation)) {
+                byte[] value = getRdb(sm.getRocksDBEngine(), key.getBytes(StandardCharsets.UTF_8));
+                if (value == null) {
+                    return Response.newBuilder()
+                            .setSuccess(true)
+                            .build();
+                }
+                Map<String, String> map = JSONObject.parseObject(
+                        new String(value, StandardCharsets.UTF_8),
+                        new TypeReference<Map<String, String>>() {
+                        });
+                String field = null;
+                if (request.getExtDataMap() != null) {
+                    field = request.getExtDataMap().get(OP_HASH_KV_FIELD);
+                }
+                Map<String, String> result = new HashMap<>();
+                if (StringUtils.isNotBlank(field)) {
+                    result.put(field, map.get(field));
+                } else {
+                    result.putAll(map);
+                }
+                return Response.newBuilder()
+                        .setSuccess(true)
+                        .putAllDataMap(result)
+                        .build();
+            }
+        } catch (Throwable e) {
+            LOGGER.error("Fail to process will ReadRequest, k {}", request.getKey(), e);
+            throw e;
+        } finally {
+            StatUtil.addInvoke("HashKvRead", System.currentTimeMillis() - start);
+        }
+        return null;
+    }
+
+    @Override
+    public Response onWriteRequest(WriteRequest log) throws Exception {
+        long start = System.currentTimeMillis();
+        try {
+            MqttStateMachine sm = server.getMqttStateMachine(log.getGroup());
+            if (sm == null) {
+                LOGGER.error("Fail to process will WriteRequest , Not Found SM for {}", log.getGroup());
+                return null;
+            }
+            String operation = log.getOperation();
+            String key = log.getKey();
+            byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+            byte[] value = log.getData().toByteArray();
+            if (value == null) {
+                return null;
+            }
+            if (OP_KV_PUT.equals(operation)) {
+                return put(sm.getRocksDBEngine(), keyBytes, value);
+            } else if (OP_KV_DEL.equals(operation)) {
+                return delete(sm.getRocksDBEngine(), keyBytes);
+            } else if (OP_KV_PUT_HASH.equals(operation)) {
+                Map<String, String> map = new HashMap<>();
+                map.putAll(log.getExtDataMap());
+                byte[] oldValue = getRdb(sm.getRocksDBEngine(), keyBytes);
+                if (oldValue != null && oldValue.length > 1) {
+                    Map<String, String> oldMap = JSONObject.parseObject(
+                            new String(oldValue, StandardCharsets.UTF_8),
+                            new TypeReference<Map<String, String>>() {
+                            });
+                    oldMap.putAll(map);
+                    map.putAll(oldMap);
+                }
+                return put(sm.getRocksDBEngine(), keyBytes, JSON.toJSONBytes(map));
+            } else if (OP_KV_DEL_HASH.equals(operation)) {
+                String field = log.getExtDataMap().get(OP_HASH_KV_FIELD);
+                if (StringUtils.isBlank(field)) {
+                    return Response.newBuilder()
+                            .setSuccess(false)
+                            .setErrMsg("No Found Field")
+                            .build();
+                }
+                byte[] oldValue = getRdb(sm.getRocksDBEngine(), keyBytes);
+                if (oldValue == null || oldValue.length < 1) {
+                    return Response.newBuilder()
+                            .setSuccess(true)
+                            .build();
+                }
+                Map<String, String> oldMap = JSONObject.parseObject(
+                        new String(oldValue, StandardCharsets.UTF_8),
+                        new TypeReference<Map<String, String>>() {
+                        });
+                if (!oldMap.containsKey(field)) {
+                    return Response.newBuilder()
+                            .setSuccess(true)
+                            .build();
+                }
+                oldMap.remove(field);
+                if (oldMap.isEmpty()) {
+                    return delete(sm.getRocksDBEngine(), keyBytes);
+                } else {
+                    return put(sm.getRocksDBEngine(), keyBytes, JSON.toJSONBytes(oldMap));
+                }
+            }
+        } catch (Throwable e) {
+            LOGGER.error("Fail to process will WriteRequest, k {}", log.getKey(), e);
+            throw e;
+        } finally {
+            StatUtil.addInvoke("HashKvWrite", System.currentTimeMillis() - start);
+        }
+        return null;
+    }
+
+    @Override
+    public String groupCategory() {
+        return CATEGORY_HASH_KV;
+    }
+}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
index f12306b..deca317 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
@@ -19,14 +19,17 @@
 
 import com.alibaba.fastjson.JSON;
 import com.google.protobuf.ByteString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
 import org.apache.rocketmq.mqtt.common.model.Trie;
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
 import org.apache.rocketmq.mqtt.common.model.consistency.Response;
+import org.apache.rocketmq.mqtt.common.model.consistency.StoreMessage;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
 import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
 import org.apache.rocketmq.mqtt.meta.raft.MqttStateMachine;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
 import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,33 +39,37 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_FIRST_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_OPERATION_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_IS_EMPTY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_EXPIRE;
+
 
 public class RetainedMsgStateProcessor extends StateProcessor {
     private static Logger logger = LoggerFactory.getLogger(RetainedMsgStateProcessor.class);
     private final ConcurrentHashMap<String, Trie<String, String>> retainedMsgTopicTrie = new ConcurrentHashMap<>();  //key:firstTopic value:retained topic Trie
     private MqttRaftServer server;
-    private int maxRetainedTopicNum;
 
-    public RetainedMsgStateProcessor(MqttRaftServer server, int maxRetainedTopicNum) {
+    public RetainedMsgStateProcessor(MqttRaftServer server) {
         this.server = server;
-        this.maxRetainedTopicNum = maxRetainedTopicNum;
     }
 
     @Override
     public Response onReadRequest(ReadRequest request) {
+        long start = System.currentTimeMillis();
         try {
             MqttStateMachine sm = server.getMqttStateMachine(request.getGroup());
             if (sm == null) {
                 logger.error("Fail to process RetainedMsg ReadRequest , Not Found SM for {}", request.getGroup());
                 return null;
             }
-            String topic = request.getExtDataMap().get("topic");
-            String firstTopic = request.getExtDataMap().get("firstTopic");
+            String topic = request.getExtDataMap().get(RETAIN_REQ_READ_PARAM_TOPIC);
+            String firstTopic = request.getExtDataMap().get(RETAIN_REQ_READ_PARAM_FIRST_TOPIC);
             String operation = request.getOperation();
-
-            logger.info("FirstTopic:{} Topic:{} Operation:{}", firstTopic, topic, operation);
-
-            if (operation.equals("topic")) {    //return retained msg
+            if (operation.equals(RETAIN_REQ_READ_PARAM_OPERATION_TOPIC)) {    //return retained msg
                 return get(sm.getRocksDBEngine(), topic.getBytes(StandardCharsets.UTF_8));
             } else { //return retain msgs of matched Topic
                 String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
@@ -95,24 +102,27 @@
                         .addAllDatalist(msgResults)//return retained msgs of matched Topic
                         .build();
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("", e);
             return Response.newBuilder()
                     .setSuccess(false)
                     .setErrMsg(e.getMessage())
                     .build();
+        } finally {
+            StatUtil.addInvoke("RetainRead", System.currentTimeMillis() - start);
         }
     }
 
-    boolean setRetainedMsg(RocksDBEngine rocksDBEngine, String firstTopic, String topic, boolean isEmpty, byte[] msg) throws Exception {
+    boolean setRetainedMsg(RocksDBEngine rocksDBEngine, String firstTopic, String topic, boolean isEmpty, byte[] msg, Long expire) throws Exception {
         String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
         // if the trie of firstTopic doesn't exist
         if (!retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
             retainedMsgTopicTrie.put(wrapTrieFirstTopic, new Trie<String, String>());
         }
         if (isEmpty) {
-            //delete from trie
-            logger.info("Delete the topic {} retained message", topic);
+            if (expire != null && !checkExpire(rocksDBEngine, expire, topic)) {
+                return true;
+            }
             delete(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8));
             Trie<String, String> trie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
             if (trie != null) {
@@ -122,7 +132,7 @@
         } else {
             //Add to trie
             Trie<String, String> trie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
-            if (trie.getNodePath().size() < maxRetainedTopicNum) {
+            if (trie.getNodePath().size() < server.getMetaConf().getMaxRetainedTopicNum()) {
                 put(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8), msg);
                 trie.addNode(topic, "", "");
                 put(rocksDBEngine, wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
@@ -134,48 +144,63 @@
         return true;
     }
 
+    private boolean checkExpire(RocksDBEngine rocksDBEngine, long expire, String topic) {
+        try {
+            byte[] value = getRdb(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8));
+            if (value == null) {
+                return true;
+            }
+            return System.currentTimeMillis() - StoreMessage.parseFrom(value).getBornTimestamp() > expire;
+        } catch (Throwable t) {
+            logger.error("", t);
+        }
+        return false;
+    }
+
     private String wrapTrieFirstTopic(String firstTopic) {
         return "$" + firstTopic + "$";
     }
 
     @Override
     public Response onWriteRequest(WriteRequest writeRequest) {
+        long start = System.currentTimeMillis();
         try {
             MqttStateMachine sm = server.getMqttStateMachine(writeRequest.getGroup());
             if (sm == null) {
                 logger.error("Fail to process RetainedMsg WriteRequest , Not Found SM for {}", writeRequest.getGroup());
                 return null;
             }
-            String firstTopic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("firstTopic"));     //retained msg firstTopic
-            String topic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("topic"));     //retained msg topic
-            boolean isEmpty = Boolean.parseBoolean(writeRequest.getExtDataMap().get("isEmpty"));     //retained msg is empty
+            String firstTopic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get(RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC));     //retained msg firstTopic
+            String topic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get(RETAIN_REQ_WRITE_PARAM_TOPIC));     //retained msg topic
+            boolean isEmpty = Boolean.parseBoolean(writeRequest.getExtDataMap().get(RETAIN_REQ_WRITE_PARAM_IS_EMPTY));     //retained msg is empty
+            String expireStr = writeRequest.getExtDataMap().get(RETAIN_REQ_WRITE_PARAM_EXPIRE);
+            Long expire = StringUtils.isNotBlank(expireStr) ? Long.parseLong(expireStr) : null;
             byte[] message = writeRequest.getData().toByteArray();
-            boolean res = setRetainedMsg(sm.getRocksDBEngine(), firstTopic, topic, isEmpty, message);
+            boolean res = setRetainedMsg(sm.getRocksDBEngine(), firstTopic, topic, isEmpty, message, expire);
             if (!res) {
-                logger.warn("Put the topic {} retained message failed! Exceeded maximum number of reserved topics limit.", topic);
                 return Response.newBuilder()
                         .setSuccess(false)
-                        .setErrMsg("Exceeded maximum number of reserved topics limit.")
+                        .setErrMsg("f")
                         .build();
             }
-            logger.info("Put the topic {} retained message success!", topic);
-
             return Response.newBuilder()
                     .setSuccess(true)
                     .setData(ByteString.copyFrom(JSON.toJSONBytes(topic)))
                     .build();
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("Put the retained message error!", e);
             return Response.newBuilder()
                     .setSuccess(false)
                     .setErrMsg(e.getMessage())
                     .build();
+        } finally {
+            StatUtil.addInvoke("RetainWrite", System.currentTimeMillis() - start);
         }
     }
 
     @Override
     public String groupCategory() {
-        return Constants.CATEGORY_RETAINED_MSG;
+        return MetaConstants.CATEGORY_RETAINED_MSG;
     }
 
 }
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
index 3fe771c..e2fef2b 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
@@ -22,7 +22,7 @@
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
 import org.apache.rocketmq.mqtt.common.model.consistency.Response;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
 import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
@@ -70,7 +70,7 @@
         try {
             byte[] value = rocksDBEngine.getRdb().get(key);
             if (value == null) {
-                value = Constants.NOT_FOUND.getBytes();
+                value = MetaConstants.NOT_FOUND.getBytes();
             }
             return Response.newBuilder()
                     .setSuccess(true)
@@ -153,7 +153,7 @@
         }
     }
 
-    public Response scan(RocksDBEngine rocksDBEngine, byte[] startKey, byte[] endKey) throws Exception {
+    public Response scan(RocksDBEngine rocksDBEngine, byte[] startKey, byte[] endKey, long scanNum) throws Exception {
         Map<String, String> result = new HashMap<>();
         final Lock readLock = rocksDBEngine.getReadWriteLock().readLock();
         readLock.lock();
@@ -170,6 +170,9 @@
                     break;
                 }
                 result.put(new String(key), new String(it.value()));
+                if (result.size() >= scanNum) {
+                    break;
+                }
                 it.next();
             }
             return Response.newBuilder()
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
index 5376a99..2e44904 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
@@ -17,16 +17,27 @@
 
 package org.apache.rocketmq.mqtt.meta.raft.processor;
 
-import org.apache.rocketmq.mqtt.common.meta.Constants;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
 import org.apache.rocketmq.mqtt.common.model.consistency.Response;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
 import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
 import org.apache.rocketmq.mqtt.meta.raft.MqttStateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.rocketmq.mqtt.common.meta.Constants.CATEGORY_WILL_MSG;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.CATEGORY_WILL_MSG;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_END_KEY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_GET;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_SCAN;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_SCAN_NUM;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_START_KEY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_COMPARE_AND_PUT;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_DELETE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_EXPECT_VALUE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_PUT;
 
 public class WillMsgStateProcessor extends StateProcessor {
     private static Logger logger = LoggerFactory.getLogger(WillMsgStateProcessor.class);
@@ -38,6 +49,7 @@
 
     @Override
     public Response onReadRequest(ReadRequest request) throws Exception {
+        long start = System.currentTimeMillis();
         try {
             MqttStateMachine sm = server.getMqttStateMachine(request.getGroup());
             if (sm == null) {
@@ -46,27 +58,35 @@
             }
             String operation = request.getOperation();
             String key = request.getKey();
-            if ("get".equals(operation)) {
+            if (WILL_REQ_READ_GET.equals(operation)) {
                 return get(sm.getRocksDBEngine(), key.getBytes());
-            } else if ("scan".equals(operation)) {
-                String startKey = request.getExtDataMap().get("startKey");
-                String endKey = request.getExtDataMap().get("endKey");
-                return scan(sm.getRocksDBEngine(), startKey.getBytes(), endKey.getBytes());
+            } else if (WILL_REQ_READ_SCAN.equals(operation)) {
+                String startKey = request.getExtDataMap().get(WILL_REQ_READ_START_KEY);
+                String endKey = request.getExtDataMap().get(WILL_REQ_READ_END_KEY);
+                String scanNumStr = request.getExtDataMap().get(WILL_REQ_READ_SCAN_NUM);
+                long scanNum = server.getMetaConf().getScanNum();
+                if (StringUtils.isNotBlank(scanNumStr)) {
+                    scanNum = Long.parseLong(scanNumStr);
+                }
+                return scan(sm.getRocksDBEngine(), startKey.getBytes(), endKey.getBytes(), scanNum);
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             if (request.getKey() == null) {
                 logger.error("Fail to delete, startKey {}, endKey {}", request.getExtDataMap().get("startKey"), request.getExtDataMap().get("endKey"), e);
             } else {
                 logger.error("Fail to process will ReadRequest, k {}", request.getKey(), e);
             }
-
             throw e;
+        } finally {
+            StatUtil.addInvoke("WillRead", System.currentTimeMillis() - start);
         }
+
         return null;
     }
 
     @Override
     public Response onWriteRequest(WriteRequest log) throws Exception {
+        long start = System.currentTimeMillis();
         try {
             MqttStateMachine sm = server.getMqttStateMachine(log.getGroup());
             if (sm == null) {
@@ -77,20 +97,22 @@
             String key = log.getKey();
             byte[] value = log.getData().toByteArray();
 
-            if ("put".equals(operation)) {
+            if (WILL_REQ_WRITE_PUT.equals(operation)) {
                 return put(sm.getRocksDBEngine(), key.getBytes(), value);
-            } else if ("delete".equals(operation)) {
+            } else if (WILL_REQ_WRITE_DELETE.equals(operation)) {
                 return delete(sm.getRocksDBEngine(), key.getBytes());
-            } else if ("compareAndPut".equals(operation)) {
-                String expectValue = log.getExtDataMap().get("expectValue");
-                if (Constants.NOT_FOUND.equals(expectValue)) {
+            } else if (WILL_REQ_WRITE_COMPARE_AND_PUT.equals(operation)) {
+                String expectValue = log.getExtDataMap().get(WILL_REQ_WRITE_EXPECT_VALUE);
+                if (MetaConstants.NOT_FOUND.equals(expectValue)) {
                     return compareAndPut(sm.getRocksDBEngine(), key.getBytes(), null, value);
                 }
                 return compareAndPut(sm.getRocksDBEngine(), key.getBytes(), log.getExtDataMap().get("expectValue").getBytes(), value);
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("Fail to process will WriteRequest, k {}", log.getKey(), e);
             throw e;
+        } finally {
+            StatUtil.addInvoke("WillWrite", System.currentTimeMillis() - start);
         }
         return null;
     }
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java
index 08463f2..3fb3e1c 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java
@@ -128,7 +128,7 @@
                                 Response response = processor.onReadRequest(request);
                                 rpcCtx.sendResponse(response);
                             } catch (Throwable t) {
-                                LOGGER.info("process read request in handleReadIndex error : {}", t.toString());
+                                LOGGER.error("process read request in handleReadIndex error : {}", t.toString());
                                 rpcCtx.sendResponse(Response.newBuilder().setErrMsg(t.toString()).setSuccess(false).build());
                             }
                             return;
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java
index f8d6755..fd436de 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java
@@ -19,14 +19,22 @@
 
 import com.alipay.sofa.jraft.rpc.RpcContext;
 import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
+import org.apache.rocketmq.mqtt.common.model.consistency.Response;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
 import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
+import org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
 
 /**
  * The RPC Processor for read request.
  */
 public class MqttReadRpcProcessor extends AbstractRpcProcessor implements RpcProcessor<ReadRequest> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MqttReadRpcProcessor.class);
     private final MqttRaftServer server;
 
     public MqttReadRpcProcessor(MqttRaftServer server) {
@@ -35,13 +43,32 @@
 
     @Override
     public void handleRequest(RpcContext rpcCtx, ReadRequest request) {
-        if (Constants.READ_INDEX_TYPE.equals(request.getType())) {
+        StatUtil.addPv(StatUtil.buildKey("ReadRpc", request.getGroup()), 1);
+        if (MetaConstants.READ_INDEX_TYPE.equals(request.getType())) {
             handleReadIndex(server, request.getGroup(), rpcCtx, request);
+        } else if (MetaConstants.ANY_READ_TYPE.equals(request.getType())) {
+            anyRead(rpcCtx, request);
         } else {
             handleRequest(server, request.getGroup(), rpcCtx, request);
         }
     }
 
+    private void anyRead(RpcContext rpcCtx, ReadRequest request) {
+        final StateProcessor processor = server.getProcessor(request.getCategory());
+        if (Objects.isNull(processor)) {
+            rpcCtx.sendResponse(Response.newBuilder().setSuccess(false)
+                    .setErrMsg("Could not find the StateProcessor: " + request.getCategory()).build());
+            return;
+        }
+        try {
+            Response response = processor.onReadRequest(request);
+            rpcCtx.sendResponse(response);
+        } catch (Throwable t) {
+            LOGGER.error("process read request in anyRead error : {}", t.toString());
+            rpcCtx.sendResponse(Response.newBuilder().setErrMsg(t.toString()).setSuccess(false).build());
+        }
+    }
+
     @Override
     public String interest() {
         return ReadRequest.class.getName();
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java
index 80f0569..92f36e5 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java
@@ -20,6 +20,7 @@
 import com.alipay.sofa.jraft.rpc.RpcContext;
 import com.alipay.sofa.jraft.rpc.RpcProcessor;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
 import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
 
 /**
@@ -34,6 +35,7 @@
 
     @Override
     public void handleRequest(RpcContext rpcCtx, WriteRequest request) {
+        StatUtil.addPv(StatUtil.buildKey("WriteRpc", request.getGroup()), 1);
         handleRequest(server, request.getGroup(), rpcCtx, request);
     }
 
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/MetaStartup.java
similarity index 85%
rename from mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java
rename to mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/MetaStartup.java
index 8b5e8b3..1adab2e 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/MetaStartup.java
@@ -16,16 +16,18 @@
   */
 package org.apache.rocketmq.mqtt.meta.starter;
 
-import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.mqtt.meta.util.SpringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
-public class Startup {
+public class MetaStartup {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MetaStartup.class);
+
     public static void main(String[] args) {
-        System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
 
         ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:meta_spring.xml");
         SpringUtil.setApplicationContext(applicationContext);
-        System.out.println("start meta ...");
+        LOGGER.info("start meta ...");
     }
 }
\ No newline at end of file
diff --git a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
index e593527..577d29a 100644
--- a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
+++ b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
@@ -37,7 +37,7 @@
 import org.apache.rocketmq.mqtt.common.model.consistency.StoreMessage;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
 import org.apache.rocketmq.mqtt.common.util.TopicUtils;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,7 +61,7 @@
     String originTopic = "test-f1/f2/";
 
     String topicFilter = "test-f1/+/";
-    final String groupId = Constants.CATEGORY_RETAINED_MSG + "-" + 0;
+    final String groupId = MetaConstants.CATEGORY_RETAINED_MSG + "-" + 0;
     final String confStr = "127.0.0.1:25001";
     CliClientServiceImpl cliClientService = new CliClientServiceImpl();
     Configuration conf = new Configuration();
@@ -182,7 +182,7 @@
         option.put("flag", "topic");
         option.put("topic", firstTopic + "/t1/");
 
-        final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedmsg-0").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();
+        final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedmsg-0").setType(MetaConstants.READ_INDEX_TYPE).putAllExtData(option).build();
 
         CompletableFuture<Message> future = new CompletableFuture<>();
 
@@ -242,7 +242,7 @@
             .addAllDatalist(msgResults)
             .build());
 
-        final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedMsg-0").setOperation("trie").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();
+        final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedMsg-0").setOperation("trie").setType(MetaConstants.READ_INDEX_TYPE).putAllExtData(option).build();
 
         try {
             cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
diff --git a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/WillMsgStateProcessorTest.java b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/WillMsgStateProcessorTest.java
index bcbb013..08690d7 100644
--- a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/WillMsgStateProcessorTest.java
+++ b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/WillMsgStateProcessorTest.java
@@ -33,7 +33,7 @@
 import java.io.File;
 import java.io.IOException;
 
-import static org.apache.rocketmq.mqtt.common.meta.Constants.NOT_FOUND;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.NOT_FOUND;
 
 @RunWith(MockitoJUnitRunner.class)
 public class WillMsgStateProcessorTest {
@@ -128,7 +128,7 @@
         Response response1 = willMsgStateProcessor.put(rocksDBEngine, key1.getBytes(), value1.getBytes());
         Assert.assertTrue(response1.getSuccess());
 
-        Response scanResponse = willMsgStateProcessor.scan(rocksDBEngine, ("k1" + CTRL_0).getBytes(), ("k1" + CTRL_2).getBytes());
+        Response scanResponse = willMsgStateProcessor.scan(rocksDBEngine, ("k1" + CTRL_0).getBytes(), ("k1" + CTRL_2).getBytes(), 100);
         Assert.assertEquals(value, scanResponse.getDataMapMap().get(key));
         Assert.assertEquals(value1, scanResponse.getDataMapMap().get(key1));
     }
diff --git a/pom.xml b/pom.xml
index e928bc0..7e9ac4e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,6 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
     <parent>
         <groupId>org.apache</groupId>
         <artifactId>apache</artifactId>
@@ -37,7 +38,7 @@
         <java.target.version>1.8</java.target.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <spring.version>4.3.16.RELEASE</spring.version>
-        <rocket.version>4.9.3</rocket.version>
+        <rocket.version>5.1.3</rocket.version>
         <prometheus.version>0.12.0</prometheus.version>
         <grpc-java.version>1.24.0</grpc-java.version>
         <proto-google-common-protos.version>1.17.0</proto-google-common-protos.version>
@@ -129,6 +130,11 @@
             <dependency>
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-classic</artifactId>
+                <version>1.2.3</version>
+            </dependency>
+            <dependency>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-classic</artifactId>
                 <version>1.2.9</version>
             </dependency>
             <dependency>